You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/05/28 09:42:12 UTC

[incubator-pinot] branch support_case_when_statement updated (c7c941a -> 5a97ea4)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch support_case_when_statement
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard c7c941a  Adding transform function support for case-when-else
     new 5a97ea4  Adding transform function support for case-when-else

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c7c941a)
            \
             N -- N -- N   refs/heads/support_case_when_statement (5a97ea4)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../transform/function/CaseTransformFunction.java  |  11 +-
 .../function/CaseTransformFunctionTest.java        | 344 ++++++++++++++++++++-
 2 files changed, 336 insertions(+), 19 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding transform function support for case-when-else

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch support_case_when_statement
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5a97ea4025113740e036d6888eeee8963f945b0c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed May 27 22:15:59 2020 -0700

    Adding transform function support for case-when-else
---
 .../common/function/TransformFunctionType.java     |   8 +
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  |  20 +-
 .../function/BinaryOperatorTransformFunction.java  | 113 +++++
 .../transform/function/CaseTransformFunction.java  | 129 +++++
 .../function/EqualsTransformFunction.java          |  24 +
 .../GreaterThanOrEqualTransformFunction.java       |  32 ++
 .../function/GreaterThanTransformFunction.java     |  32 ++
 .../function/LessThanOrEqualTransformFunction.java |  32 ++
 .../function/LessThanTransformFunction.java        |  32 ++
 .../function/LiteralTransformFunction.java         |  36 +-
 .../function/NotEqualsTransformFunction.java       |  24 +
 .../function/TransformFunctionFactory.java         |   9 +
 .../function/BaseTransformFunctionTest.java        |  15 +
 .../BinaryOperatorTransformFunctionTest.java       | 103 ++++
 .../function/CaseTransformFunctionTest.java        | 522 +++++++++++++++++++++
 .../function/EqualsTransformFunctionTest.java      |  52 ++
 .../GreaterThanOrEqualTransformFunctionTest.java   |  52 ++
 .../function/GreaterThanTransformFunctionTest.java |  52 ++
 .../LessThanOrEqualTransformFunctionTest.java      |  52 ++
 .../function/LessThanTransformFunctionTest.java    |  52 ++
 .../function/NotEqualsTransformFunctionTest.java   |  52 ++
 .../tests/OfflineClusterIntegrationTest.java       |  31 +-
 22 files changed, 1452 insertions(+), 22 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index f8081ce..86d5c8d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -38,7 +38,15 @@ public enum TransformFunctionType {
   LN("ln"),
   SQRT("sqrt"),
 
+  EQUALS("equals"),
+  NOT_EQUALS("not_equals"),
+  GREATER_THAN("greater_than"),
+  GREATER_THAN_OR_EQUAL("greater_than_or_equal"),
+  LESS_THAN("less_than"),
+  LESS_THAN_OR_EQUAL("less_than_or_equal"),
+
   CAST("cast"),
+  CASE("case"),
   JSONEXTRACTSCALAR("jsonExtractScalar"),
   JSONEXTRACTKEY("jsonExtractKey"),
   TIMECONVERT("timeConvert"),
diff --git a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 80ef72b..b590be5 100644
--- a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -52,13 +52,9 @@ public class CalciteSqlCompilerTest {
   @Test
   public void testCaseWhenStatements() {
     PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(
-        "SELECT OrderID, Quantity,\n"
-            + "CASE\n"
-            + "    WHEN Quantity > 30 THEN 'The quantity is greater than 30'\n"
-            + "    WHEN Quantity = 30 THEN 'The quantity is 30'\n"
-            + "    ELSE 'The quantity is under 30'\n"
-            + "END AS QuantityText\n"
-            + "FROM OrderDetails");
+        "SELECT OrderID, Quantity,\n" + "CASE\n" + "    WHEN Quantity > 30 THEN 'The quantity is greater than 30'\n"
+            + "    WHEN Quantity = 30 THEN 'The quantity is 30'\n" + "    ELSE 'The quantity is under 30'\n"
+            + "END AS QuantityText\n" + "FROM OrderDetails");
     Assert.assertEquals(pinotQuery.getSelectList().get(0).getIdentifier().getName(), "OrderID");
     Assert.assertEquals(pinotQuery.getSelectList().get(1).getIdentifier().getName(), "Quantity");
     Function asFunc = pinotQuery.getSelectList().get(2).getFunctionCall();
@@ -79,14 +75,8 @@ public class CalciteSqlCompilerTest {
     Assert.assertEquals(caseFunc.getOperands().get(4).getLiteral().getFieldValue(), "The quantity is under 30");
 
     pinotQuery = CalciteSqlParser.compileToPinotQuery(
-        "SELECT Quantity,\n"
-            + "SUM(CASE\n"
-            + "    WHEN Quantity > 30 THEN 3\n"
-            + "    WHEN Quantity > 20 THEN 2\n"
-            + "    WHEN Quantity > 10 THEN 1\n"
-            + "    ELSE 0\n"
-            + "END) AS new_sum_quant\n"
-            + "FROM OrderDetails");
+        "SELECT Quantity,\n" + "SUM(CASE\n" + "    WHEN Quantity > 30 THEN 3\n" + "    WHEN Quantity > 20 THEN 2\n"
+            + "    WHEN Quantity > 10 THEN 1\n" + "    ELSE 0\n" + "END) AS new_sum_quant\n" + "FROM OrderDetails");
     Assert.assertEquals(pinotQuery.getSelectList().get(0).getIdentifier().getName(), "Quantity");
     asFunc = pinotQuery.getSelectList().get(1).getFunctionCall();
     Assert.assertEquals(asFunc.getOperator(), SqlKind.AS.name());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
new file mode 100644
index 0000000..d97a5d4
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * BinaryOperatorTransformFunction abstracts common functions for binary operators (=, !=, >=, >, <=, <)
+ * The results are in boolean format and stored as an integer array with 1 represents true and 0 represents false.
+ */
+public abstract class BinaryOperatorTransformFunction extends BaseTransformFunction {
+
+  protected TransformFunction _leftTransformFunction;
+  protected TransformFunction _rightTransformFunction;
+  protected int[] _results;
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() != 2) {
+      throw new IllegalArgumentException("Exact 2 arguments are required for greater transform function");
+    }
+    _leftTransformFunction = arguments.get(0);
+    _rightTransformFunction = arguments.get(1);
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return INT_SV_NO_DICTIONARY_METADATA;
+  }
+
+  protected void fillResultArray(ProjectionBlock projectionBlock) {
+    if (_results == null) {
+      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+    FieldSpec.DataType dataType = _leftTransformFunction.getResultMetadata().getDataType();
+    int length = projectionBlock.getNumDocs();
+    switch (dataType) {
+      case INT:
+        int[] leftIntValues = _leftTransformFunction.transformToIntValuesSV(projectionBlock);
+        int[] rightIntValues = _rightTransformFunction.transformToIntValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Integer) leftIntValues[i]).compareTo(rightIntValues[i]));
+        }
+        break;
+      case LONG:
+        long[] leftLongValues = _leftTransformFunction.transformToLongValuesSV(projectionBlock);
+        long[] rightLongValues = _rightTransformFunction.transformToLongValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Long) leftLongValues[i]).compareTo(rightLongValues[i]));
+        }
+        break;
+      case FLOAT:
+        float[] leftFloatValues = _leftTransformFunction.transformToFloatValuesSV(projectionBlock);
+        float[] rightFloatValues = _rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Float) leftFloatValues[i]).compareTo(rightFloatValues[i]));
+        }
+        break;
+      case DOUBLE:
+        double[] leftDoubleValues = _leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
+        double[] rightDoubleValues = _rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Double) leftDoubleValues[i]).compareTo(rightDoubleValues[i]));
+        }
+        break;
+      case STRING:
+        String[] leftStringValues = _leftTransformFunction.transformToStringValuesSV(projectionBlock);
+        String[] rightStringValues = _rightTransformFunction.transformToStringValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(leftStringValues[i].compareTo(rightStringValues[i]));
+        }
+        break;
+      case BYTES:
+        byte[][] leftBytesValues = _leftTransformFunction.transformToBytesValuesSV(projectionBlock);
+        byte[][] rightBytesValues = _rightTransformFunction.transformToBytesValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] =
+              getBinaryFuncResult((new ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
+        }
+        break;
+      // NOTE: Multi-value columns are not comparable, so we should not reach here
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  abstract int getBinaryFuncResult(int result);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
new file mode 100644
index 0000000..3928287
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+
+
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+
+  private int _numberWhenStatements;
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<LiteralTransformFunction> _elseThenStatements = new ArrayList<>();
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add((LiteralTransformFunction) arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add((LiteralTransformFunction) arguments.get(i));
+    }
+    _resultMetadata = _elseThenStatements.get(0).getResultMetadata();
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return _resultMetadata;
+  }
+
+  private int[] getSelectedArray(ProjectionBlock projectionBlock) {
+    int[] selected = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      TransformFunction transformFunction = _whenStatements.get(i);
+      int[] conditions = transformFunction.transformToIntValuesSV(projectionBlock);
+      for (int j = 0; j < conditions.length; j++) {
+        if (selected[j] == 0 && conditions[j] == 1) {
+          selected[j] = i + 1;
+        }
+      }
+    }
+    return selected;
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    int[] results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Double.valueOf(_elseThenStatements.get(selected[i]).getLiteral()).intValue();
+    }
+    return results;
+  }
+
+  @Override
+  public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    long[] results = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Double.valueOf(_elseThenStatements.get(selected[i]).getLiteral()).longValue();
+    }
+    return results;
+  }
+
+  @Override
+  public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    float[] results = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Double.valueOf(_elseThenStatements.get(selected[i]).getLiteral()).floatValue();
+    }
+    return results;
+  }
+
+  @Override
+  public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    double[] results = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Double.parseDouble(_elseThenStatements.get(selected[i]).getLiteral());
+    }
+    return results;
+  }
+
+  @Override
+  public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    String[] results = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = _elseThenStatements.get(selected[i]).getLiteral();
+    }
+    return results;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunction.java
new file mode 100644
index 0000000..1bd5e58
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunction.java
@@ -0,0 +1,24 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class EqualsTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.EQUALS.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result == 0) ? 1 : 0;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunction.java
new file mode 100644
index 0000000..5afbdd2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class GreaterThanOrEqualTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result >= 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.GREATER_THAN_OR_EQUAL.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunction.java
new file mode 100644
index 0000000..fc36ded
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class GreaterThanTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result > 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.GREATER_THAN.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunction.java
new file mode 100644
index 0000000..21c5fce
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class LessThanOrEqualTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result <= 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.LESS_THAN_OR_EQUAL.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunction.java
new file mode 100644
index 0000000..717597c
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class LessThanTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result < 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.LESS_THAN.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
index 4317a2b..43e8dfc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.utils.BytesUtils;
 
 
 /**
@@ -35,6 +36,11 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
 public class LiteralTransformFunction implements TransformFunction {
   private final String _literal;
   private String[] _result;
+  private int[] _intResult;
+  private long[] _longResult;
+  private float[] _floatResult;
+  private double[] _doubleResult;
+  private byte[][] _bytesResult;
 
   public LiteralTransformFunction(String literal) {
     _literal = literal;
@@ -75,22 +81,38 @@ public class LiteralTransformFunction implements TransformFunction {
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_intResult == null) {
+      _intResult = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_intResult, Integer.parseInt(_literal));
+    }
+    return _intResult;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_longResult == null) {
+      _longResult = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_longResult, Long.parseLong(_literal));
+    }
+    return _longResult;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_floatResult == null) {
+      _floatResult = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_floatResult, Float.parseFloat(_literal));
+    }
+    return _floatResult;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_doubleResult == null) {
+      _doubleResult = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_doubleResult, Double.parseDouble(_literal));
+    }
+    return _doubleResult;
   }
 
   @Override
@@ -104,7 +126,11 @@ public class LiteralTransformFunction implements TransformFunction {
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_bytesResult == null) {
+      _bytesResult = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+      Arrays.fill(_bytesResult, BytesUtils.toBytes(_literal));
+    }
+    return _bytesResult;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunction.java
new file mode 100644
index 0000000..32ad40f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunction.java
@@ -0,0 +1,24 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class NotEqualsTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.NOT_EQUALS.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result != 0) ? 1 : 0;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index e9bd6bc..059db49 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -73,6 +73,15 @@ public class TransformFunctionFactory {
           put(TransformFunctionType.ARRAYLENGTH.getName().toLowerCase(), ArrayLengthTransformFunction.class);
           put(TransformFunctionType.VALUEIN.getName().toLowerCase(), ValueInTransformFunction.class);
           put(TransformFunctionType.MAPVALUE.getName().toLowerCase(), MapValueTransformFunction.class);
+
+          put(TransformFunctionType.CASE.getName().toLowerCase(), CaseTransformFunction.class);
+
+          put(TransformFunctionType.EQUALS.getName().toLowerCase(), EqualsTransformFunction.class);
+          put(TransformFunctionType.NOT_EQUALS.getName().toLowerCase(), NotEqualsTransformFunction.class);
+          put(TransformFunctionType.GREATER_THAN.getName().toLowerCase(), GreaterThanTransformFunction.class);
+          put(TransformFunctionType.GREATER_THAN_OR_EQUAL.getName().toLowerCase(), GreaterThanOrEqualTransformFunction.class);
+          put(TransformFunctionType.LESS_THAN.getName().toLowerCase(), LessThanTransformFunction.class);
+          put(TransformFunctionType.LESS_THAN_OR_EQUAL.getName().toLowerCase(), LessThanOrEqualTransformFunction.class);
         }
       };
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index ab9949c..1649d03 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -154,6 +154,21 @@ public abstract class BaseTransformFunctionTest {
         new DocIdSetOperator(new MatchAllFilterOperator(NUM_ROWS), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();
   }
 
+  protected void testTransformFunction(TransformFunction transformFunction, int[] expectedValues) {
+    int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
+    long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
+    float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
+    double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(intValues[i], expectedValues[i]);
+      Assert.assertEquals(longValues[i], expectedValues[i]);
+      Assert.assertEquals(floatValues[i], (float) expectedValues[i]);
+      Assert.assertEquals(doubleValues[i], (double) expectedValues[i]);
+      Assert.assertEquals(stringValues[i], Integer.toString(expectedValues[i]));
+    }
+  }
+
   protected void testTransformFunction(TransformFunction transformFunction, double[] expectedValues) {
     int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunctionTest.java
new file mode 100644
index 0000000..59d7b0d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunctionTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public abstract class BinaryOperatorTransformFunctionTest extends BaseTransformFunctionTest {
+
+  abstract int getExpectedValue(int value, int toCompare);
+
+  abstract int getExpectedValue(long value, long toCompare);
+
+  abstract int getExpectedValue(float value, float toCompare);
+
+  abstract int getExpectedValue(double value, double toCompare);
+
+  abstract int getExpectedValue(String value, String toCompare);
+
+  abstract String getFuncName();
+
+  @Test
+  public void testBinaryOperatorTransformFunction() {
+    TransformExpressionTree expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %d)", getFuncName(), INT_SV_COLUMN, _intSVValues[0]));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertEquals(transformFunction.getName(), getFuncName().toLowerCase());
+    int[] expectedIntValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedIntValues[i] = getExpectedValue(_intSVValues[i], _intSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedIntValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %d)", getFuncName(), LONG_SV_COLUMN, _longSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedLongValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedLongValues[i] = getExpectedValue(_longSVValues[i], _longSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedLongValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %f)", getFuncName(), FLOAT_SV_COLUMN, _floatSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedFloatValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedFloatValues[i] = getExpectedValue(_floatSVValues[i], _floatSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedFloatValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %.20f)", getFuncName(), DOUBLE_SV_COLUMN, _doubleSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedDoubleValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedDoubleValues[i] = getExpectedValue(_doubleSVValues[i], _doubleSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedDoubleValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, '%s')", getFuncName(), STRING_SV_COLUMN, _stringSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedStringValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedStringValues[i] = getExpectedValue(_stringSVValues[i], _stringSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedStringValues);
+  }
+
+  @Test(dataProvider = "testIllegalArguments", expectedExceptions = {BadQueryRequestException.class})
+  public void testIllegalArguments(String expressionStr) {
+    TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(expressionStr);
+    TransformFunctionFactory.get(expression, _dataSourceMap);
+  }
+
+  @DataProvider(name = "testIllegalArguments")
+  public Object[][] testIllegalArguments() {
+    return new Object[][]{new Object[]{String.format("%s(%s)", getFuncName(),
+        INT_SV_COLUMN)}, new Object[]{String.format("%s(%s, %s, %s)", getFuncName(), LONG_SV_COLUMN, INT_SV_COLUMN,
+        STRING_SV_COLUMN)}};
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
new file mode 100644
index 0000000..38ac139
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.Random;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
+
+  private final int INDEX_TO_COMPARE = new Random(System.currentTimeMillis()).nextInt(NUM_ROWS);
+  private final TransformFunctionType[] BINARY_OPERATOR_TRANSFORM_FUNCTIONS =
+      new TransformFunctionType[]{TransformFunctionType.EQUALS, TransformFunctionType.NOT_EQUALS, TransformFunctionType.GREATER_THAN, TransformFunctionType.GREATER_THAN_OR_EQUAL, TransformFunctionType.LESS_THAN, TransformFunctionType.LESS_THAN_OR_EQUAL};
+
+  @Test
+  public void testCaseTransformFunctionWithIntResults() {
+    for (TransformFunctionType functionType : BINARY_OPERATOR_TRANSFORM_FUNCTIONS) {
+      testCaseQueryWithIntResults(String.format("%s(%s, %s)", functionType.getName(), INT_SV_COLUMN,
+          String.format("%d", _intSVValues[INDEX_TO_COMPARE])), getExpectedIntResults(INT_SV_COLUMN, functionType));
+      testCaseQueryWithIntResults(String.format("%s(%s, %s)", functionType.getName(), LONG_SV_COLUMN,
+          String.format("%d", _longSVValues[INDEX_TO_COMPARE])), getExpectedIntResults(LONG_SV_COLUMN, functionType));
+      testCaseQueryWithIntResults(String.format("%s(%s, %s)", functionType.getName(), FLOAT_SV_COLUMN,
+          String.format("%f", _floatSVValues[INDEX_TO_COMPARE])), getExpectedIntResults(FLOAT_SV_COLUMN, functionType));
+      testCaseQueryWithIntResults(String.format("%s(%s, %s)", functionType.getName(), DOUBLE_SV_COLUMN,
+          String.format("%.20f", _doubleSVValues[INDEX_TO_COMPARE])),
+          getExpectedIntResults(DOUBLE_SV_COLUMN, functionType));
+      testCaseQueryWithIntResults(String.format("%s(%s, %s)", functionType.getName(), STRING_SV_COLUMN,
+          String.format("'%s'", _stringSVValues[INDEX_TO_COMPARE])),
+          getExpectedIntResults(STRING_SV_COLUMN, functionType));
+    }
+  }
+
+  @Test
+  public void testCaseTransformFunctionWithDoubleResults() {
+    for (TransformFunctionType functionType : BINARY_OPERATOR_TRANSFORM_FUNCTIONS) {
+      testCaseQueryWithDoubleResults(String.format("%s(%s, %s)", functionType.getName(), INT_SV_COLUMN,
+          String.format("%d", _intSVValues[INDEX_TO_COMPARE])), getExpectedDoubleResults(INT_SV_COLUMN, functionType));
+      testCaseQueryWithDoubleResults(String.format("%s(%s, %s)", functionType.getName(), LONG_SV_COLUMN,
+          String.format("%d", _longSVValues[INDEX_TO_COMPARE])),
+          getExpectedDoubleResults(LONG_SV_COLUMN, functionType));
+      testCaseQueryWithDoubleResults(String.format("%s(%s, %s)", functionType.getName(), FLOAT_SV_COLUMN,
+          String.format("%f", _floatSVValues[INDEX_TO_COMPARE])),
+          getExpectedDoubleResults(FLOAT_SV_COLUMN, functionType));
+      testCaseQueryWithDoubleResults(String.format("%s(%s, %s)", functionType.getName(), DOUBLE_SV_COLUMN,
+          String.format("%.20f", _doubleSVValues[INDEX_TO_COMPARE])),
+          getExpectedDoubleResults(DOUBLE_SV_COLUMN, functionType));
+      testCaseQueryWithDoubleResults(String.format("%s(%s, %s)", functionType.getName(), STRING_SV_COLUMN,
+          String.format("'%s'", _stringSVValues[INDEX_TO_COMPARE])),
+          getExpectedDoubleResults(STRING_SV_COLUMN, functionType));
+    }
+  }
+
+  @Test
+  public void testCaseTransformFunctionWithStringResults() {
+    for (TransformFunctionType functionType : BINARY_OPERATOR_TRANSFORM_FUNCTIONS) {
+      testCaseQueryWithStringResults(String.format("%s(%s, %s)", functionType.getName(), INT_SV_COLUMN,
+          String.format("%d", _intSVValues[INDEX_TO_COMPARE])), getExpectedStringResults(INT_SV_COLUMN, functionType));
+      testCaseQueryWithStringResults(String.format("%s(%s, %s)", functionType.getName(), LONG_SV_COLUMN,
+          String.format("%d", _longSVValues[INDEX_TO_COMPARE])),
+          getExpectedStringResults(LONG_SV_COLUMN, functionType));
+      testCaseQueryWithStringResults(String.format("%s(%s, %s)", functionType.getName(), FLOAT_SV_COLUMN,
+          String.format("%f", _floatSVValues[INDEX_TO_COMPARE])),
+          getExpectedStringResults(FLOAT_SV_COLUMN, functionType));
+      testCaseQueryWithStringResults(String.format("%s(%s, %s)", functionType.getName(), DOUBLE_SV_COLUMN,
+          String.format("%.20f", _doubleSVValues[INDEX_TO_COMPARE])),
+          getExpectedStringResults(DOUBLE_SV_COLUMN, functionType));
+      testCaseQueryWithStringResults(String.format("%s(%s, %s)", functionType.getName(), STRING_SV_COLUMN,
+          String.format("'%s'", _stringSVValues[INDEX_TO_COMPARE])),
+          getExpectedStringResults(STRING_SV_COLUMN, functionType));
+    }
+  }
+
+  private void testCaseQueryWithIntResults(String predicate, int[] expectedValues) {
+    TransformExpressionTree expression =
+        TransformExpressionTree.compileToExpressionTree(String.format("CASE(%s, 100, 10)", predicate));
+    expression.getChildren().set(0, TransformExpressionTree.compileToExpressionTree(predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    Assert.assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    testTransformFunction(transformFunction, expectedValues);
+  }
+
+  private void testCaseQueryWithDoubleResults(String predicate, double[] expectedValues) {
+    TransformExpressionTree expression =
+        TransformExpressionTree.compileToExpressionTree(String.format("CASE(%s, 100.0, 10.0)", predicate));
+    expression.getChildren().set(0, TransformExpressionTree.compileToExpressionTree(predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    Assert.assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    testTransformFunction(transformFunction, expectedValues);
+  }
+
+  private void testCaseQueryWithStringResults(String predicate, String[] expectedValues) {
+    TransformExpressionTree expression =
+        TransformExpressionTree.compileToExpressionTree(String.format("CASE(%s, 'aaa', 'bbb')", predicate));
+    expression.getChildren().set(0, TransformExpressionTree.compileToExpressionTree(predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    Assert.assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    testTransformFunction(transformFunction, expectedValues);
+  }
+
+  @Test(dataProvider = "testIllegalArguments", expectedExceptions = {BadQueryRequestException.class})
+  public void testIllegalArguments(String expressionStr) {
+    TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(expressionStr);
+    TransformFunctionFactory.get(expression, _dataSourceMap);
+  }
+
+  @DataProvider(name = "testIllegalArguments")
+  public Object[][] testIllegalArguments() {
+    return new Object[][]{new Object[]{String.format("case(%s)", INT_SV_COLUMN)}, new Object[]{String.format(
+        "case(%s, %s)", LONG_SV_COLUMN, 10)}};
+  }
+
+  private int[] getExpectedIntResults(String column, TransformFunctionType type) {
+    int[] result = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      switch (column) {
+        case INT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_intSVValues[i] == _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_intSVValues[i] != _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_intSVValues[i] > _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] >= _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_intSVValues[i] < _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] <= _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case LONG_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_longSVValues[i] == _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_longSVValues[i] != _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_longSVValues[i] > _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] >= _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_longSVValues[i] < _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] <= _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case FLOAT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_floatSVValues[i] == _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_floatSVValues[i] != _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_floatSVValues[i] > _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] >= _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_floatSVValues[i] < _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] <= _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case DOUBLE_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_doubleSVValues[i] == _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_doubleSVValues[i] != _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_doubleSVValues[i] > _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] >= _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_doubleSVValues[i] < _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] <= _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case STRING_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) == 0) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) != 0) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) > 0) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) >= 0) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) < 0) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) <= 0) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+      }
+    }
+    return result;
+  }
+
+  private double[] getExpectedDoubleResults(String column, TransformFunctionType type) {
+    double[] result = new double[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      switch (column) {
+        case INT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_intSVValues[i] == _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_intSVValues[i] != _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_intSVValues[i] > _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] >= _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_intSVValues[i] < _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] <= _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case LONG_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_longSVValues[i] == _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_longSVValues[i] != _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_longSVValues[i] > _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] >= _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_longSVValues[i] < _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] <= _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case FLOAT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_floatSVValues[i] == _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_floatSVValues[i] != _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_floatSVValues[i] > _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] >= _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_floatSVValues[i] < _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] <= _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case DOUBLE_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_doubleSVValues[i] == _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_doubleSVValues[i] != _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_doubleSVValues[i] > _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] >= _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_doubleSVValues[i] < _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] <= _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case STRING_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) == 0) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) != 0) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) > 0) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) >= 0) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) < 0) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) <= 0) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+      }
+    }
+    return result;
+  }
+
+  private String[] getExpectedStringResults(String column, TransformFunctionType type) {
+    String[] result = new String[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      switch (column) {
+        case INT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_intSVValues[i] == _intSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case NOT_EQUALS:
+              result[i] = (_intSVValues[i] != _intSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN:
+              result[i] = (_intSVValues[i] > _intSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] >= _intSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN:
+              result[i] = (_intSVValues[i] < _intSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] <= _intSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case LONG_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_longSVValues[i] == _longSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case NOT_EQUALS:
+              result[i] = (_longSVValues[i] != _longSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN:
+              result[i] = (_longSVValues[i] > _longSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] >= _longSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN:
+              result[i] = (_longSVValues[i] < _longSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] <= _longSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case FLOAT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_floatSVValues[i] == _floatSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case NOT_EQUALS:
+              result[i] = (_floatSVValues[i] != _floatSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN:
+              result[i] = (_floatSVValues[i] > _floatSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] >= _floatSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN:
+              result[i] = (_floatSVValues[i] < _floatSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] <= _floatSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case DOUBLE_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_doubleSVValues[i] == _doubleSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case NOT_EQUALS:
+              result[i] = (_doubleSVValues[i] != _doubleSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN:
+              result[i] = (_doubleSVValues[i] > _doubleSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] >= _doubleSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN:
+              result[i] = (_doubleSVValues[i] < _doubleSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] <= _doubleSVValues[INDEX_TO_COMPARE]) ? "aaa" : "bbb";
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case STRING_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) == 0) ? "aaa" : "bbb";
+              break;
+            case NOT_EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) != 0) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) > 0) ? "aaa" : "bbb";
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) >= 0) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) < 0) ? "aaa" : "bbb";
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) <= 0) ? "aaa" : "bbb";
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+      }
+    }
+    return result;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunctionTest.java
new file mode 100644
index 0000000..6e9dab5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class EqualsTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) == 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new EqualsTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunctionTest.java
new file mode 100644
index 0000000..8c26967
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class GreaterThanOrEqualTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) >= 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new GreaterThanOrEqualTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunctionTest.java
new file mode 100644
index 0000000..ee0d5e5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class GreaterThanTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) > 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new GreaterThanTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunctionTest.java
new file mode 100644
index 0000000..9c3569b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class LessThanOrEqualTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) <= 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new LessThanOrEqualTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunctionTest.java
new file mode 100644
index 0000000..f04b06b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class LessThanTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) < 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new LessThanTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunctionTest.java
new file mode 100644
index 0000000..7567730
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class NotEqualsTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) != 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new NotEqualsTransformFunction().getName();
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 223602c..fb59604 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -95,6 +95,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
   private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
       new ArrayList<>(getNumBrokers() + getNumServers());
+  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
 
   protected int getNumBrokers() {
     return NUM_BROKERS;
@@ -104,8 +105,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     return NUM_SERVERS;
   }
 
-  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
-
   @Override
   protected String getSchemaFileName() {
     return _schemaFileName;
@@ -753,6 +752,34 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
+  public void testCaseWhenStatement()
+      throws Exception {
+    testCountVsCaseQuery("origin = 'ATL'");
+    testCountVsCaseQuery("origin <> 'ATL'");
+
+    testCountVsCaseQuery("DaysSinceEpoch > 16312");
+    testCountVsCaseQuery("DaysSinceEpoch >= 16312");
+    testCountVsCaseQuery("DaysSinceEpoch < 16312");
+    testCountVsCaseQuery("DaysSinceEpoch <= 16312");
+    testCountVsCaseQuery("DaysSinceEpoch = 16312");
+    testCountVsCaseQuery("DaysSinceEpoch <> 16312");
+  }
+
+  private void testCountVsCaseQuery(String predicate)
+      throws Exception {
+    // System.out.println("predicate = " + predicate);
+    String sqlQuery = String.format("SELECT COUNT(*) FROM mytable WHERE %s", predicate);
+    JsonNode response = postSqlQuery(sqlQuery, _brokerBaseApiUrl);
+    // System.out.println(String.format("query = %s, response = %s",sqlQuery, response));
+    long countValue = response.get("resultTable").get("rows").get(0).get(0).asLong();
+    sqlQuery = String.format("SELECT SUM(CASE WHEN %s THEN 1 ELSE 0 END) as sum1 FROM mytable", predicate);
+    response = postSqlQuery(sqlQuery, _brokerBaseApiUrl);
+    // System.out.println(String.format("query = %s, response = %s",sqlQuery, response));
+    long caseSum = response.get("resultTable").get("rows").get(0).get(0).asLong();
+    Assert.assertEquals(caseSum, countValue);
+  }
+
+  @Test
   public void testFilterWithInvertedIndexUDF()
       throws Exception {
     int daysSinceEpoch = 16138;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org