You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/12/09 22:30:41 UTC

[pinot] branch master updated: [multistage] fix function usage of casting (#9909)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6abfbfb6f7 [multistage] fix function usage of casting (#9909)
6abfbfb6f7 is described below

commit 6abfbfb6f738fc8b19778595d63d31b2f96a5ae1
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Dec 9 14:30:33 2022 -0800

    [multistage] fix function usage of casting (#9909)
    
    This PR addresses two issues with type-casting.
    1. `dataSchema.ColumnDataType.convert()` will assume the underlying serialized data type. which should not be used other than the Numeric columns.
    2. `functionInvoker` only casts input/operand into the correct parameter java type but it doesn't do it for output.
    
    This PR:
    - fixed the 1st issue, by
      - create a more strict failure mechanism on boolean checker
      - always do Numeric casting on input/operand type
    - fixed the 2nd issue, by
      - allow return type casting on TransformOperator
      - making sure the casting handles null
    - general improvements:
      - merged filter operand to transform operand, so that filter clause can also be used during transform such as: `SELECT boolCol AND (intCol > 0) FROM tbl`
      - unified RelDataType resolution to DataSchema.ColumnDataType, PinotDataType and FieldSpec.DataType
    
    TODOs
    - the casting doesn't need to be checked on a per row-basis thanks to operator has the return schema: this means the casting can be hard-coded when required based on inferred data type
    - add more tests
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../org/apache/pinot/query/catalog/PinotTable.java |   2 +-
 .../query/planner/logical/RelToStageConverter.java |  28 ++-
 .../pinot/query/planner/logical/RexExpression.java |  56 +----
 .../query/planner/logical/RexExpressionUtils.java  |  12 +-
 .../apache/pinot/query/routing/WorkerManager.java  |   2 +-
 .../org/apache/pinot/query/type/TypeSystem.java    |  12 ++
 .../query/mailbox/MultiplexingMailboxService.java  |   2 +-
 .../query/runtime/operator/FilterOperator.java     |  11 +-
 .../query/runtime/operator/HashJoinOperator.java   |  13 +-
 .../LeafStageTransferableBlockOperator.java        |   2 +-
 .../query/runtime/operator/TransformOperator.java  |   4 +-
 .../runtime/operator/operands/FilterOperand.java   | 226 ++++-----------------
 .../runtime/operator/operands/FunctionOperand.java |   2 +-
 .../operator/operands/TransformOperand.java        | 103 +++++++++-
 .../operator/utils/FunctionInvokeUtils.java        |  45 ++++
 .../operator/{ => utils}/OperatorUtils.java        |   2 +-
 .../query/runtime/operator/FilterOperatorTest.java |  13 +-
 .../runtime/operator/TransformOperatorTest.java    |   4 +-
 .../src/test/resources/queries/TypeCasting.json    | 163 +++++++++++++++
 19 files changed, 432 insertions(+), 270 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
index 23e6444e90..36a5f3730c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.catalog;
 
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.rel.type.RelDataType;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index fcd19ad4c5..72976185aa 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
@@ -43,6 +44,7 @@ import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.spi.data.FieldSpec;
 
 
 /**
@@ -131,7 +133,7 @@ public final class RelToStageConverter {
       String[] columnNames = recordType.getFieldNames().toArray(new String[]{});
       DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[columnNames.length];
       for (int i = 0; i < columnNames.length; i++) {
-        columnDataTypes[i] = convertColumnDataType(recordType.getFieldList().get(i));
+        columnDataTypes[i] = convertToColumnDataType(recordType.getFieldList().get(i).getType());
       }
       return new DataSchema(columnNames, columnDataTypes);
     } else {
@@ -139,8 +141,8 @@ public final class RelToStageConverter {
     }
   }
 
-  private static DataSchema.ColumnDataType convertColumnDataType(RelDataTypeField relDataTypeField) {
-    switch (relDataTypeField.getType().getSqlTypeName()) {
+  public static DataSchema.ColumnDataType convertToColumnDataType(RelDataType relDataType) {
+    switch (relDataType.getSqlTypeName()) {
       case BOOLEAN:
         return DataSchema.ColumnDataType.BOOLEAN;
       case TINYINT:
@@ -150,7 +152,7 @@ public final class RelToStageConverter {
       case BIGINT:
         return DataSchema.ColumnDataType.LONG;
       case DECIMAL:
-        return resolveDecimal(relDataTypeField);
+        return resolveDecimal(relDataType);
       case FLOAT:
         return DataSchema.ColumnDataType.FLOAT;
       case REAL:
@@ -167,21 +169,29 @@ public final class RelToStageConverter {
       case VARBINARY:
         return DataSchema.ColumnDataType.BYTES;
       default:
-        throw new IllegalStateException("Unexpected RelDataTypeField: " + relDataTypeField.getType() + " for column: "
-            + relDataTypeField.getName());
+        return DataSchema.ColumnDataType.BYTES;
     }
   }
 
+  public static FieldSpec.DataType convertToFieldSpecDataType(RelDataType relDataType) {
+    return convertToColumnDataType(relDataType).toDataType();
+  }
+
+  public static PinotDataType convertToPinotDataType(RelDataType relDataType) {
+    return PinotDataType.getPinotDataTypeForExecution(convertToColumnDataType(relDataType));
+  }
+
   /**
    * Calcite uses DEMICAL type to infer data type hoisting and infer arithmetic result types. down casting this
    * back to the proper primitive type for Pinot.
    *
    * @param relDataType the DECIMAL rel data type.
    * @return proper {@link DataSchema.ColumnDataType}.
+   * @see {@link org.apache.calcite.rel.type.RelDataTypeFactoryImpl#decimalOf}.
    */
-  private static DataSchema.ColumnDataType resolveDecimal(RelDataTypeField relDataType) {
-    int precision = relDataType.getType().getPrecision();
-    int scale = relDataType.getType().getScale();
+  private static DataSchema.ColumnDataType resolveDecimal(RelDataType relDataType) {
+    int precision = relDataType.getPrecision();
+    int scale = relDataType.getScale();
     if (scale == 0) {
       if (precision <= 10) {
         return DataSchema.ColumnDataType.INT;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index e9a4a99679..7ef42ff7c9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -22,14 +22,12 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.NlsString;
-import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -49,7 +47,7 @@ public interface RexExpression {
       return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
     } else if (rexNode instanceof RexLiteral) {
       RexLiteral rexLiteral = ((RexLiteral) rexNode);
-      FieldSpec.DataType dataType = toDataType(rexLiteral.getType());
+      FieldSpec.DataType dataType = RelToStageConverter.convertToFieldSpecDataType(rexLiteral.getType());
       return new RexExpression.Literal(dataType, toRexValue(dataType, rexLiteral.getValue()));
     } else if (rexNode instanceof RexCall) {
       RexCall rexCall = (RexCall) rexNode;
@@ -70,37 +68,17 @@ public interface RexExpression {
       default:
         List<RexExpression> operands =
             rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
-        return new RexExpression.FunctionCall(rexCall.getKind(), toDataType(rexCall.getType()),
+        return new RexExpression.FunctionCall(rexCall.getKind(),
+            RelToStageConverter.convertToFieldSpecDataType(rexCall.getType()),
             rexCall.getOperator().getName(), operands);
     }
   }
 
-  static PinotDataType toPinotDataType(RelDataType type) {
-    switch (type.getSqlTypeName()) {
-      case INTEGER:
-        return PinotDataType.INTEGER;
-      case BIGINT:
-        return PinotDataType.LONG;
-      case FLOAT:
-        return PinotDataType.FLOAT;
-      // TODO: support DECIMAL properly.
-      case DECIMAL:
-      case DOUBLE:
-        return PinotDataType.DOUBLE;
-      case CHAR:
-      case VARCHAR:
-        return PinotDataType.STRING;
-      case BOOLEAN:
-        return PinotDataType.BOOLEAN;
-      default:
-        throw new IllegalArgumentException("Unsupported data type: " + type);
-    }
-  }
-
   static RexExpression toRexExpression(AggregateCall aggCall) {
     List<RexExpression> operands = aggCall.getArgList().stream().map(InputRef::new).collect(Collectors.toList());
-    return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(), toDataType(aggCall.getType()),
-        aggCall.getAggregation().getName(), operands);
+    return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(),
+        RelToStageConverter.convertToFieldSpecDataType(aggCall.getType()), aggCall.getAggregation().getName(),
+        operands);
   }
 
   static Object toRexValue(FieldSpec.DataType dataType, Comparable value) {
@@ -121,28 +99,6 @@ public interface RexExpression {
     }
   }
 
-  static FieldSpec.DataType toDataType(RelDataType type) {
-    switch (type.getSqlTypeName()) {
-      case INTEGER:
-        return FieldSpec.DataType.INT;
-      case BIGINT:
-        return FieldSpec.DataType.LONG;
-      case FLOAT:
-        return FieldSpec.DataType.FLOAT;
-      case DECIMAL:
-      case DOUBLE:
-        return FieldSpec.DataType.DOUBLE;
-      case CHAR:
-      case VARCHAR:
-        return FieldSpec.DataType.STRING;
-      case BOOLEAN:
-        return FieldSpec.DataType.BOOLEAN;
-      default:
-        // TODO: do not assume byte type.
-        return FieldSpec.DataType.BYTES;
-    }
-  }
-
   class InputRef implements RexExpression {
     @ProtoProperties
     private SqlKind _sqlKind;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index 314b0baa54..364da7c164 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -43,7 +43,8 @@ public class RexExpressionUtils {
   static RexExpression handleCase(RexCall rexCall) {
     List<RexExpression> operands =
         rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
-    return new RexExpression.FunctionCall(rexCall.getKind(), RexExpression.toDataType(rexCall.getType()),
+    return new RexExpression.FunctionCall(rexCall.getKind(),
+        RelToStageConverter.convertToFieldSpecDataType(rexCall.getType()),
         "caseWhen", operands);
   }
 
@@ -54,9 +55,10 @@ public class RexExpressionUtils {
         rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
     Preconditions.checkState(operands.size() == 1, "CAST takes exactly 2 arguments");
     RelDataType castType = rexCall.getType();
-    operands.add(new RexExpression.Literal(FieldSpec.DataType.STRING, RexExpression.toPinotDataType(castType).name()));
-    return new RexExpression.FunctionCall(rexCall.getKind(), RexExpression.toDataType(castType), "CAST",
-        operands);
+    operands.add(new RexExpression.Literal(FieldSpec.DataType.STRING,
+        RelToStageConverter.convertToFieldSpecDataType(castType).name()));
+    return new RexExpression.FunctionCall(rexCall.getKind(), RelToStageConverter.convertToFieldSpecDataType(castType),
+        "CAST", operands);
   }
 
   // TODO: Add support for range filter expressions (e.g. a > 0 and a < 30)
@@ -64,7 +66,7 @@ public class RexExpressionUtils {
     List<RexNode> operands = rexCall.getOperands();
     RexInputRef rexInputRef = (RexInputRef) operands.get(0);
     RexLiteral rexLiteral = (RexLiteral) operands.get(1);
-    FieldSpec.DataType dataType = RexExpression.toDataType(rexLiteral.getType());
+    FieldSpec.DataType dataType = RelToStageConverter.convertToFieldSpecDataType(rexLiteral.getType());
     Sarg sarg = rexLiteral.getValueAs(Sarg.class);
     if (sarg.isPoints()) {
       return new RexExpression.FunctionCall(SqlKind.IN, dataType, SqlKind.IN.name(), toFunctionOperands(rexInputRef,
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 42bb19d269..36c0f2bd81 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.routing;
 
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collection;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
index ea6a9e7a35..5476c8ad23 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
@@ -25,6 +25,8 @@ import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
  * The {@code TypeSystem} overwrites Calcite type system with Pinot specific logics.
  */
 public class TypeSystem extends RelDataTypeSystemImpl {
+  private static final int MAX_DECIMAL_SCALE_DIGIT = 1000;
+  private static final int MAX_DECIMAL_PRECISION_DIGIT = 1000;
 
   @Override
   public boolean shouldConvertRaggedUnionTypesToVarying() {
@@ -38,4 +40,14 @@ public class TypeSystem extends RelDataTypeSystemImpl {
     // behavior. This calcite flag will cause this to be cast to VARCHAR instead
     return true;
   }
+
+  @Override
+  public int getMaxNumericScale() {
+    return MAX_DECIMAL_SCALE_DIGIT;
+  }
+
+  @Override
+  public int getMaxNumericPrecision() {
+    return MAX_DECIMAL_PRECISION_DIGIT;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
index f8bf271233..e80a65ce71 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
 import java.util.function.Consumer;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 66956a95ec..32fe4508cf 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -28,7 +28,9 @@ import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+
 
 /*
    FilterOperator apply filter on rows from upstreamOperator.
@@ -46,14 +48,14 @@ import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
 public class FilterOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "FILTER";
   private final Operator<TransferableBlock> _upstreamOperator;
-  private final FilterOperand _filterOperand;
+  private final TransformOperand _filterOperand;
   private final DataSchema _dataSchema;
   private TransferableBlock _upstreamErrorBlock;
 
   public FilterOperator(Operator<TransferableBlock> upstreamOperator, DataSchema dataSchema, RexExpression filter) {
     _upstreamOperator = upstreamOperator;
     _dataSchema = dataSchema;
-    _filterOperand = FilterOperand.toFilterOperand(filter, dataSchema);
+    _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
     _upstreamErrorBlock = null;
   }
 
@@ -78,6 +80,7 @@ public class FilterOperator extends BaseOperator<TransferableBlock> {
     }
   }
 
+  @SuppressWarnings("ConstantConditions")
   private TransferableBlock transform(TransferableBlock block)
       throws Exception {
     if (_upstreamErrorBlock != null) {
@@ -92,7 +95,7 @@ public class FilterOperator extends BaseOperator<TransferableBlock> {
     List<Object[]> resultRows = new ArrayList<>();
     List<Object[]> container = block.getContainer();
     for (Object[] row : container) {
-      if (_filterOperand.apply(row)) {
+      if ((Boolean) FunctionInvokeUtils.convert(_filterOperand.apply(row), DataSchema.ColumnDataType.BOOLEAN)) {
         resultRows.add(row);
       }
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index a0bc9329c7..89ff2f1abb 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,7 +37,8 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
 
 
 /**
@@ -61,7 +62,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   private final JoinRelType _joinType;
   private final DataSchema _resultSchema;
   private final int _resultRowSize;
-  private final List<FilterOperand> _joinClauseEvaluators;
+  private final List<TransformOperand> _joinClauseEvaluators;
   private boolean _isHashTableBuilt;
   private TransferableBlock _upstreamErrorBlock;
   private KeySelector<Object[], Object[]> _leftKeySelector;
@@ -80,7 +81,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
     _resultSchema = outputSchema;
     _joinClauseEvaluators = new ArrayList<>(joinClauses.size());
     for (RexExpression joinClause : joinClauses) {
-      _joinClauseEvaluators.add(FilterOperand.toFilterOperand(joinClause, _resultSchema));
+      _joinClauseEvaluators.add(TransformOperand.toTransformOperand(joinClause, _resultSchema));
     }
     _joinType = joinType;
     _resultRowSize = _resultSchema.size();
@@ -168,8 +169,8 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
         for (Object[] rightRow : hashCollection) {
           // TODO: Optimize this to avoid unnecessary object copy.
           Object[] resultRow = joinRow(leftRow, rightRow);
-          if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream()
-              .allMatch(evaluator -> evaluator.apply(resultRow))) {
+          if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(evaluator ->
+              (Boolean) FunctionInvokeUtils.convert(evaluator.apply(resultRow), DataSchema.ColumnDataType.BOOLEAN))) {
             rows.add(resultRow);
           }
         }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 41f8b4535e..582e85a9ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index e00d4a9eed..80af357cec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -30,6 +30,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
 
 
 /**
@@ -104,7 +105,8 @@ public class TransformOperator extends BaseOperator<TransferableBlock> {
     for (Object[] row : container) {
       Object[] resultRow = new Object[_resultColumnSize];
       for (int i = 0; i < _resultColumnSize; i++) {
-        resultRow[i] = _transformOperandsList.get(i).apply(row);
+        resultRow[i] = FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row),
+            _resultSchema.getColumnDataType(i));
       }
       resultRows.add(resultRow);
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
index 169643b24b..37a932c594 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
@@ -18,167 +18,33 @@
  */
 package org.apache.pinot.query.runtime.operator.operands;
 
-import com.clearspring.analytics.util.Preconditions;
+
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.runtime.operator.OperatorUtils;
-import org.apache.pinot.spi.data.FieldSpec;
 
 
 public abstract class FilterOperand extends TransformOperand {
 
-  public static FilterOperand toFilterOperand(RexExpression rexExpression, DataSchema dataSchema) {
-    if (rexExpression instanceof RexExpression.FunctionCall) {
-      return toFilterOperand((RexExpression.FunctionCall) rexExpression, dataSchema);
-    } else if (rexExpression instanceof RexExpression.InputRef) {
-      return toFilterOperand((RexExpression.InputRef) rexExpression, dataSchema);
-    } else if (rexExpression instanceof RexExpression.Literal) {
-      return toFilterOperand((RexExpression.Literal) rexExpression);
-    } else {
-      throw new UnsupportedOperationException("Unsupported expression on filter conversion: " + rexExpression);
-    }
-  }
-
-  private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
-    return new BooleanLiteral(literal);
-  }
-
-  private static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) {
-    return new BooleanInputRef(inputRef, dataSchema);
-  }
-
-  private static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
-    int operandSize = functionCall.getFunctionOperands().size();
-    // TODO: Move these functions out of this class.
-    switch (OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
-      case "AND":
-        Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument, passed in argument size:" + operandSize);
-        return new And(functionCall.getFunctionOperands(), dataSchema);
-      case "OR":
-        Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument, passed in argument size:" + operandSize);
-        return new Or(functionCall.getFunctionOperands(), dataSchema);
-      case "NOT":
-        Preconditions.checkState(operandSize == 1, "NOT takes one argument, passed in argument size:" + operandSize);
-        return new Not(toFilterOperand(functionCall.getFunctionOperands().get(0), dataSchema));
-      case "equals":
-        return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
-          @Override
-          public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
-                == 0;
-          }
-        };
-      case "notEquals":
-        return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
-          @Override
-          public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
-                != 0;
-          }
-        };
-      case "greaterThan":
-        return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
-          @Override
-          public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
-                > 0;
-          }
-        };
-      case "greaterThanOrEqual":
-        return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
-          @Override
-          public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
-                >= 0;
-          }
-        };
-      case "lessThan":
-        return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
-          @Override
-          public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
-                < 0;
-          }
-        };
-      case "lessThanOrEqual":
-        return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
-          @Override
-          public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
-                <= 0;
-          }
-        };
-      default:
-        return new BooleanFunction(functionCall, dataSchema);
-    }
-  }
-
   @Override
   public abstract Boolean apply(Object[] row);
 
-  private static class BooleanFunction extends FilterOperand {
-    private final FunctionOperand _func;
+  public static class And extends FilterOperand {
+    List<TransformOperand> _childOperands;
 
-    public BooleanFunction(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
-      FunctionOperand func = (FunctionOperand) TransformOperand.toTransformOperand(functionCall, dataSchema);
-      Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN,
-          "Expecting boolean result type but got type:" + func.getResultType());
-      _func = func;
-    }
-
-    @Override
-    public Boolean apply(Object[] row) {
-      return (Boolean) _func.apply(row);
-    }
-  }
-
-  private static class BooleanInputRef extends FilterOperand {
-    private final RexExpression.InputRef _inputRef;
-
-    public BooleanInputRef(RexExpression.InputRef inputRef, DataSchema dataSchema) {
-      DataSchema.ColumnDataType inputType = dataSchema.getColumnDataType(inputRef.getIndex());
-      Preconditions.checkState(inputType == DataSchema.ColumnDataType.BOOLEAN,
-          "Input has to be boolean type but got type:" + inputType);
-      _inputRef = inputRef;
-    }
-
-    @Override
-    public Boolean apply(Object[] row) {
-      return (boolean) row[_inputRef.getIndex()];
-    }
-  }
-
-  private static class BooleanLiteral extends FilterOperand {
-    private final Object _literalValue;
-
-    public BooleanLiteral(RexExpression.Literal literal) {
-      Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN,
-          "Only boolean literal is supported as filter, but got type:" + literal.getDataType());
-      _literalValue = literal.getValue();
-    }
-
-    @Override
-    public Boolean apply(Object[] row) {
-      return (boolean) _literalValue;
-    }
-  }
-
-  private static class And extends FilterOperand {
-    List<FilterOperand> _childOperands;
-
-    public And(List<RexExpression> childExprs, DataSchema dataSchema) {
+    public And(List<RexExpression> childExprs, DataSchema inputDataSchema) {
       _childOperands = new ArrayList<>(childExprs.size());
       for (RexExpression childExpr : childExprs) {
-        _childOperands.add(toFilterOperand(childExpr, dataSchema));
+        _childOperands.add(toTransformOperand(childExpr, inputDataSchema));
       }
     }
 
     @Override
     public Boolean apply(Object[] row) {
-      for (FilterOperand child : _childOperands) {
-        if (!child.apply(row)) {
+      for (TransformOperand child : _childOperands) {
+        if (!(Boolean) child.apply(row)) {
           return false;
         }
       }
@@ -186,20 +52,20 @@ public abstract class FilterOperand extends TransformOperand {
     }
   }
 
-  private static class Or extends FilterOperand {
-    List<FilterOperand> _childOperands;
+  public static class Or extends FilterOperand {
+    List<TransformOperand> _childOperands;
 
-    public Or(List<RexExpression> childExprs, DataSchema dataSchema) {
+    public Or(List<RexExpression> childExprs, DataSchema inputDataSchema) {
       _childOperands = new ArrayList<>(childExprs.size());
       for (RexExpression childExpr : childExprs) {
-        _childOperands.add(toFilterOperand(childExpr, dataSchema));
+        _childOperands.add(toTransformOperand(childExpr, inputDataSchema));
       }
     }
 
     @Override
     public Boolean apply(Object[] row) {
-      for (FilterOperand child : _childOperands) {
-        if (child.apply(row)) {
+      for (TransformOperand child : _childOperands) {
+        if ((Boolean) child.apply(row)) {
           return true;
         }
       }
@@ -207,34 +73,28 @@ public abstract class FilterOperand extends TransformOperand {
     }
   }
 
-  private static class Not extends FilterOperand {
-    FilterOperand _childOperand;
+  public static class Not extends FilterOperand {
+    TransformOperand _childOperand;
 
-    public Not(FilterOperand childOperand) {
-      _childOperand = childOperand;
+    public Not(RexExpression childExpr, DataSchema inputDataSchema) {
+      _childOperand = toTransformOperand(childExpr, inputDataSchema);
     }
 
     @Override
     public Boolean apply(Object[] row) {
-      return !_childOperand.apply(row);
+      return !(Boolean) _childOperand.apply(row);
     }
   }
 
-  private static abstract class Predicate extends FilterOperand {
+  public static abstract class Predicate extends FilterOperand {
     protected final TransformOperand _lhs;
     protected final TransformOperand _rhs;
-    protected final DataSchema.ColumnDataType _resultType;
-
-    public Predicate(List<RexExpression> functionOperands, DataSchema dataSchema) {
-      Preconditions.checkState(functionOperands.size() == 2,
-          "Expected 2 function ops for Predicate but got:" + functionOperands.size());
-      _lhs = TransformOperand.toTransformOperand(functionOperands.get(0), dataSchema);
-      _rhs = TransformOperand.toTransformOperand(functionOperands.get(1), dataSchema);
-      _resultType = resolveResultType(_lhs._resultType, _rhs._resultType);
-    }
+    protected final boolean _requireCasting;
+    protected final DataSchema.ColumnDataType _commonCastType;
 
     /**
-     * Resolve data type, since we don't have a exhausted list of filter function signatures. we rely on type casting.
+     * Predicate constructor also resolve data type,
+     * since we don't have a exhausted list of filter function signatures. we rely on type casting.
      *
      * <ul>
      *   <li>if both RHS and LHS has null data type, exception occurs.</li>
@@ -243,28 +103,32 @@ public abstract class FilterOperand extends TransformOperand {
      *   <li>if we can't resolve a common data type, exception occurs.</li>
      * </ul>
      *
-     * @param lhsType left-hand-side type
-     * @param rhsType right-hand-side type
-     * @return best common conversion data type.
-     * @see DataSchema.ColumnDataType#isSuperTypeOf(DataSchema.ColumnDataType)
+     *
      */
-    private static DataSchema.ColumnDataType resolveResultType(DataSchema.ColumnDataType lhsType,
-        DataSchema.ColumnDataType rhsType) {
+    public Predicate(List<RexExpression> functionOperands, DataSchema inputDataSchema) {
+      Preconditions.checkState(functionOperands.size() == 2,
+          "Expected 2 function ops for Predicate but got:" + functionOperands.size());
+      _lhs = TransformOperand.toTransformOperand(functionOperands.get(0), inputDataSchema);
+      _rhs = TransformOperand.toTransformOperand(functionOperands.get(1), inputDataSchema);
+
       // TODO: Correctly throw exception instead of returning null.
       // Currently exception thrown during constructor is not piped back to query dispatcher, thus in order to
       // avoid silent failure, we deliberately set to null here, make the exception thrown during data processing.
-      if (lhsType == null && rhsType == null) {
-        return null;
-      } else if (lhsType == null || lhsType == DataSchema.ColumnDataType.OBJECT) {
-        return rhsType;
-      } else if (rhsType == null || rhsType == DataSchema.ColumnDataType.OBJECT) {
-        return lhsType;
-      } else if (lhsType.isSuperTypeOf(rhsType)) {
-        return lhsType;
-      } else if (rhsType.isSuperTypeOf(rhsType)) {
-        return rhsType;
+      // TODO: right now all the numeric columns are still doing conversion b/c even if the inputDataSchema asked for
+      // one of the number type, it might not contain the exact type in the payload.
+      if (_lhs._resultType == null || _lhs._resultType == DataSchema.ColumnDataType.OBJECT
+          || _rhs._resultType == null || _rhs._resultType == DataSchema.ColumnDataType.OBJECT) {
+        _requireCasting = false;
+        _commonCastType = null;
+      } else if (_lhs._resultType.isSuperTypeOf(_rhs._resultType)) {
+        _requireCasting = _lhs._resultType != _rhs._resultType || _lhs._resultType.isNumber();
+        _commonCastType = _lhs._resultType;
+      } else if (_rhs._resultType.isSuperTypeOf(_lhs._resultType)) {
+        _requireCasting = _lhs._resultType != _rhs._resultType || _rhs._resultType.isNumber();
+        _commonCastType = _rhs._resultType;
       } else {
-        return null;
+        _requireCasting = false;
+        _commonCastType = null;
       }
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
index 029f5ddaad..b5852bad11 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.function.FunctionUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.runtime.operator.OperatorUtils;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 
 /*
  * FunctionOperands are generated from {@link RexExpression}s.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
index 7618f74aea..0fc17ba407 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
@@ -18,19 +18,24 @@
  */
 package org.apache.pinot.query.runtime.operator.operands;
 
+
+import com.google.common.base.Preconditions;
+import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 
 
 public abstract class TransformOperand {
   protected String _resultName;
   protected DataSchema.ColumnDataType _resultType;
 
-  public static TransformOperand toTransformOperand(RexExpression rexExpression, DataSchema dataSchema) {
+  public static TransformOperand toTransformOperand(RexExpression rexExpression, DataSchema inputDataSchema) {
     if (rexExpression instanceof RexExpression.InputRef) {
-      return new ReferenceOperand((RexExpression.InputRef) rexExpression, dataSchema);
+      return new ReferenceOperand((RexExpression.InputRef) rexExpression, inputDataSchema);
     } else if (rexExpression instanceof RexExpression.FunctionCall) {
-      return new FunctionOperand((RexExpression.FunctionCall) rexExpression, dataSchema);
+      return toTransformOperand((RexExpression.FunctionCall) rexExpression, inputDataSchema);
     } else if (rexExpression instanceof RexExpression.Literal) {
       return new LiteralOperand((RexExpression.Literal) rexExpression);
     } else {
@@ -38,6 +43,98 @@ public abstract class TransformOperand {
     }
   }
 
+  @SuppressWarnings({"ConstantConditions", "rawtypes", "unchecked"})
+  private static TransformOperand toTransformOperand(RexExpression.FunctionCall functionCall,
+      DataSchema inputDataSchema) {
+    final List<RexExpression> functionOperands = functionCall.getFunctionOperands();
+    int operandSize = functionOperands.size();
+    switch (OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
+      case "AND":
+        Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument, passed in argument size:" + operandSize);
+        return new FilterOperand.And(functionOperands, inputDataSchema);
+      case "OR":
+        Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument, passed in argument size:" + operandSize);
+        return new FilterOperand.Or(functionOperands, inputDataSchema);
+      case "NOT":
+        Preconditions.checkState(operandSize == 1, "NOT takes one argument, passed in argument size:" + operandSize);
+        return new FilterOperand.Not(functionOperands.get(0), inputDataSchema);
+      case "equals":
+        return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+          @Override
+          public Boolean apply(Object[] row) {
+            if (_requireCasting) {
+              return ((Comparable) FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+                  FunctionInvokeUtils.convert(_rhs.apply(row), _commonCastType)) == 0;
+            } else {
+              return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row)) == 0;
+            }
+          }
+        };
+      case "notEquals":
+        return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+          @Override
+          public Boolean apply(Object[] row) {
+            if (_requireCasting) {
+              return ((Comparable) FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+                  FunctionInvokeUtils.convert(_rhs.apply(row), _commonCastType)) != 0;
+            } else {
+              return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row)) != 0;
+            }
+          }
+        };
+      case "greaterThan":
+        return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+          @Override
+          public Boolean apply(Object[] row) {
+            if (_requireCasting) {
+              return ((Comparable) FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+                  FunctionInvokeUtils.convert(_rhs.apply(row), _commonCastType)) > 0;
+            } else {
+              return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row)) > 0;
+            }
+          }
+        };
+      case "greaterThanOrEqual":
+        return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+          @Override
+          public Boolean apply(Object[] row) {
+            if (_requireCasting) {
+              return ((Comparable) FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+                  FunctionInvokeUtils.convert(_rhs.apply(row), _commonCastType)) >= 0;
+            } else {
+              return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row)) >= 0;
+            }
+          }
+        };
+      case "lessThan":
+        return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+          @Override
+          public Boolean apply(Object[] row) {
+            if (_requireCasting) {
+              return ((Comparable) FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+                  FunctionInvokeUtils.convert(_rhs.apply(row), _commonCastType)) < 0;
+            } else {
+              return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row)) < 0;
+            }
+          }
+        };
+      case "lessThanOrEqual":
+        return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+          @Override
+          public Boolean apply(Object[] row) {
+            if (_requireCasting) {
+              return ((Comparable) FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+                  FunctionInvokeUtils.convert(_rhs.apply(row), _commonCastType)) <= 0;
+            } else {
+              return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row)) <= 0;
+            }
+          }
+        };
+      default:
+        return new FunctionOperand(functionCall, inputDataSchema);
+    }
+  }
+
   public String getResultName() {
     return _resultName;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/FunctionInvokeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/FunctionInvokeUtils.java
new file mode 100644
index 0000000000..de26cdfab4
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/FunctionInvokeUtils.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import org.apache.pinot.common.utils.DataSchema;
+
+
+public class FunctionInvokeUtils {
+
+  private FunctionInvokeUtils() {
+    // do not instantiate.
+  }
+
+  /**
+   * Convert result to the appropriate column data type according to the desired {@link DataSchema.ColumnDataType}
+   * of the {@link org.apache.pinot.core.common.Operator}.
+   *
+   * @param inputObj input entry
+   * @param columnDataType desired column data type
+   * @return converted entry
+   */
+  public static Object convert(Object inputObj, DataSchema.ColumnDataType columnDataType) {
+    if (columnDataType.isNumber() && columnDataType != DataSchema.ColumnDataType.BIG_DECIMAL) {
+      return inputObj == null ? null : columnDataType.convert(inputObj);
+    } else {
+      return inputObj;
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
similarity index 97%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index 2bad078e3f..a5042c99bb 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.operator;
+package org.apache.pinot.query.runtime.operator.utils;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index 7b1f5bce5f..1e78233234 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -130,7 +130,7 @@ public class FilterOperatorTest {
     Assert.assertTrue(result.isEmpty());
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*boolean literal.*")
+  @Test
   public void shouldThrowOnNonBooleanTypeBooleanLiteral() {
     RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "false");
     DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
@@ -139,10 +139,13 @@ public class FilterOperatorTest {
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
     FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+    TransferableBlock errorBlock = op.getNextBlock();
+    Assert.assertTrue(errorBlock.isErrorBlock());
+    DataBlock data = errorBlock.getDataBlock();
+    Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("cast"));
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Input has to be "
-      + "boolean type.*")
+  @Test
   public void shouldThrowOnNonBooleanTypeInputRef() {
     RexExpression ref0 = new RexExpression.InputRef(0);
     DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
@@ -151,6 +154,10 @@ public class FilterOperatorTest {
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
     FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0);
+    TransferableBlock errorBlock = op.getNextBlock();
+    Assert.assertTrue(errorBlock.isErrorBlock());
+    DataBlock data = errorBlock.getDataBlock();
+    Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("cast"));
   }
 
   @Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index 868ec3b9b0..9c6370b5f5 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -194,8 +194,8 @@ public class TransformOperatorTest {
         }));
     RexExpression.Literal boolLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
     RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str");
-    DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+    DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING});
     TransformOperator op =
         new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
     TransferableBlock result = op.nextBlock();
diff --git a/pinot-query-runtime/src/test/resources/queries/TypeCasting.json b/pinot-query-runtime/src/test/resources/queries/TypeCasting.json
new file mode 100644
index 0000000000..f134cfd4b2
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/TypeCasting.json
@@ -0,0 +1,163 @@
+{
+  "explicit_cast": {
+    "comment": "explicit type casting for operations",
+    "tables": {
+      "tbl": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "bigDecimalCol", "type": "BIG_DECIMAL"},
+          {"name": "boolCol", "type": "BOOLEAN"},
+          {"name": "timestampCol", "type": "TIMESTAMP"},
+          {"name": "stringCol", "type": "STRING"},
+          {"name": "bytesCol", "type": "BYTES"}
+        ],
+        "inputs": [
+          [1, 14, 3.0, 5.176518e16, "1e505", true, "1970-01-01 01:02:03.456", "lyons", "DEADBEEF"],
+          [2, 21, 4.0, 4.608155e11, "1e350", true, "1999-01-08 22:05:46", "onan", "DE12BEEF"],
+          [3, 14, 5.0, 1.249261e11, "1e505", false, "1999-01-08 04:05:06.001", "rudvalis", "A000"],
+          [4, 21, 5.0, 8.677557e19, "1e404", false, "5760-01-01 04:05:06", "janko", "FEEE"],
+          [1, 41, 2.0, 4.154786e33, "1e505", true, "2022-01-02 03:45:00", "baby", "1000"],
+          [2, 46, 1.0, 8.080171e53, "1e401", false, "1969-12-31 23:59:59", "monster", "00"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "all types should conform when directly selected out",
+        "sql": "SELECT * FROM {tbl}"
+      },
+      {
+        "description": "all types should conform when directly selected out & transfer between stages",
+        "sql": "SELECT * FROM {tbl} AS a JOIN {tbl} AS b ON a.intCol = b.intCol WHERE a.boolCol = true AND b.boolCol = false"
+      },
+      { "sql": "SELECT CAST(floatCol AS DOUBLE) * 1e100, CAST(intCol AS BIGINT) * 2000000000, CAST(longCol AS DOUBLE) * 1e100, CAST(boolCol AS INT) FROM {tbl}" },
+      { "sql": "SELECT CAST(a.floatCol AS DOUBLE) * 1e100, CAST(a.intCol AS BIGINT) * 2000000000, CAST(b.longCol AS FLOAT) * 1e20, CAST(a.boolCol AS INT) FROM {tbl} AS a JOIN {tbl} AS b ON a.intCol = b.intCol WHERE a.boolCol = true" },
+      {
+        "ignored": true,
+        "comments": "primitive cast not work: cast as DECIMAL doesn't work",
+        "sql": "SELECT CAST(doubleCol AS DECIMAL) FROM {tbl} WHERE bigDecimalCol > 0 AND CAST(bytesCol AS VARCHAR) != '1000'"
+      },
+      {
+        "ignored": true,
+        "comments": "special cast not work: timestamp cast not supported, varchar cast not supported",
+        "sql": "SELECT CAST(b.timestampCol AS BIGINT), CAST(stringCol AS VARBINARY) FROM {tbl}"
+      }
+    ]
+  },
+  "function_operand_casting": {
+    "comment": "built-in function from SqlStdOperatorTable with implicit/explicit casting",
+    "tables": {
+      "tbl": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "bigDecimalCol", "type": "BIG_DECIMAL"},
+          {"name": "boolCol", "type": "BOOLEAN"},
+          {"name": "timestampCol", "type": "TIMESTAMP"},
+          {"name": "stringCol", "type": "STRING"},
+          {"name": "bytesCol", "type": "BYTES"}
+        ],
+        "inputs": [
+          [1, 14, 3.0, 5.176518e16, "1e505", true, "1970-01-01 01:02:03.456", "lyons", "DEADBEEF"],
+          [2, 21, 4.0, 4.608155e11, "1e350", true, "1999-01-08 22:05:46", "onan", "DE12BEEF"],
+          [3, 14, 5.0, 1.249261e11, "1e505", false, "1999-01-08 04:05:06.001", "rudvalis", "A000"],
+          [4, 21, 5.0, 8.677557e19, "1e404", false, "5760-01-01 04:05:06", "janko", "FEEE"],
+          [1, 41, 2.0, 4.154786e33, "1e505", true, "2022-01-02 03:45:00", "baby", "1000"],
+          [2, 46, 1.0, 8.080171e53, "1e401", false, "1969-12-31 23:59:59", "monster", "00"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "test (1) SqlStdOperator with Pinot ScalarFunction (UPPER, SQRT); (2) SqlStdOperator with Pinot ScalarFunction multiple argument (POWER); (3) SqlStdOperator with Pinot Filter (AND, =), (4) SqlStdOperator with Pinot operator (+) ",
+        "sql": "SELECT UPPER(stringCol), POWER(intCol, 2), SQRT(longCol), boolCol AND (intCol = 1), floatCol + 10 FROM {tbl}"
+      },
+      {
+        "description": "test SqlStdOperator with 4 Pinot variances above, but also mixed in with intermediate stage transfer",
+        "sql": "SELECT LOWER(a.stringCol), POWER(a.longCol, 3), SQRT(b.intCol), a.boolCol AND (b.intCol = 1), a.floatCol + b.doubleCol FROM {tbl} AS a JOIN {tbl} AS b ON a.intCol = b.intCol"
+      }
+    ]
+  },
+  "udf_argument_casting": {
+    "comment": "user-defined (or @ScalarFunction annotated) function with implicit/explicit casting",
+    "tables": {
+      "tbl": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "bigDecimalCol", "type": "BIG_DECIMAL"},
+          {"name": "boolCol", "type": "BOOLEAN"},
+          {"name": "timestampCol", "type": "TIMESTAMP"},
+          {"name": "timestampStringCol", "type": "TIMESTAMP"},
+          {"name": "stringCol", "type": "STRING"},
+          {"name": "bytesCol", "type": "BYTES"}
+        ],
+        "inputs": [
+          [1, 14, 3.0, 5.176518e16, "1e505", true, "1970-01-01 01:02:03.456", "123", "lyons", "DEADBEEF"],
+          [2, 21, 4.0, 4.608155e11, "1e350", true, "1999-01-08 22:05:46", "123", "onan", "DE12BEEF"],
+          [3, 14, 5.0, 1.249261e11, "1e505", false, "1999-01-08 04:05:06.001", "123", "rudvalis", "A000"],
+          [4, 21, 5.0, 8.677557e19, "1e404", false, "5760-01-01 04:05:06", "123", "janko", "FEEE"],
+          [1, 41, 2.0, 4.154786e33, "1e505", true, "2022-01-02 03:45:00", "123", "baby", "1000"],
+          [2, 46, 1.0, 8.080171e53, "1e401", false, "1969-12-31 23:59:59", "123", "monster", "00"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "test (1) Pinot ScalarFunction (md5); (2) Pinot ScalarFunction multiple argument (dateTrunc, substr); (3) Pinot filter (regexpLike), (4) Pinot transform (dateTimeConvert) ",
+        "sql": "SELECT md5(bytesCol), substr(stringCol, 5), regexpExtract(stringCol, '([\\w]+).*') FROM {tbl} WHERE regexpLike(stringCol, '.*')",
+        "outputs": [
+          ["2f249230a8e7c2bf6005ccd2679259ec", "", "lyons"],
+          ["a85a5fd494d9a538e22b696159931c1b", "", "onan"],
+          ["982569213f522d8fce898806d0a2c357", "lis", "rudvalis"],
+          ["ae04dbae988ab45ebfba84c0c3612a50", "", "janko"],
+          ["d479436bd32066b25886f9920c7b7ccf", "", "baby"],
+          ["93b885adfe0da089cdf634904fd59f71", "er", "monster"]
+        ]
+      },
+      {
+        "description": "test Pinot function variances above, but also mixed in with intermediate stage transfer",
+        "sql": "SELECT md5(a.bytesCol), substr(b.stringCol, 5), regexpExtract(a.stringCol, '([\\w]+).*') FROM {tbl} AS a JOIN {tbl} AS b ON a.intCol = b.intCol WHERE regexpLike(a.stringCol, b.stringCol)",
+        "outputs": [
+          ["2f249230a8e7c2bf6005ccd2679259ec", "", "lyons"],
+          ["a85a5fd494d9a538e22b696159931c1b", "", "onan"],
+          ["982569213f522d8fce898806d0a2c357", "lis", "rudvalis"],
+          ["ae04dbae988ab45ebfba84c0c3612a50", "", "janko"],
+          ["d479436bd32066b25886f9920c7b7ccf", "", "baby"],
+          ["93b885adfe0da089cdf634904fd59f71", "er", "monster"]
+        ]
+      },
+      {
+        "ignored": true,
+        "comment": "problematic dateTimeConvert with weird exception on leaf stage",
+        "sql": "SELECT dateTimeConvert(timestampStringCol, '1:MILLISECONDS:EPOCH', '1:MINUTES:EPOCH', '30:MINUTES') FROM {tbl}",
+        "outputs": [
+          []
+        ]
+      },
+      {
+        "ignored": true,
+        "comment": "problematic round function not producing proper decimal results type",
+        "sql": "SELECT ROUND(longCol, 2) FROM {tbl}",
+        "outputs": [
+          []
+        ]
+      },
+      {
+        "ignored": true,
+        "comment": "dateTrunc returns round up instead of round down results",
+        "sql": "SELECT dateTrunc('DAY', b.timestampCol) FROM {tbl}",
+        "outputs": [
+          []
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org