You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/05/28 09:27:59 UTC

[incubator-pinot] 02/02: Adding transform function support for case-when-else

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch support_case_when_statement
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c7c941a5b4fe51bfb1b81bd57fb7040ca9413061
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed May 27 22:15:59 2020 -0700

    Adding transform function support for case-when-else
---
 .../common/function/TransformFunctionType.java     |   8 +
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  |  20 +-
 .../function/BinaryOperatorTransformFunction.java  | 113 ++++++++++++
 .../transform/function/CaseTransformFunction.java  | 130 +++++++++++++
 .../function/EqualsTransformFunction.java          |  24 +++
 .../GreaterThanOrEqualTransformFunction.java       |  32 ++++
 .../function/GreaterThanTransformFunction.java     |  32 ++++
 .../function/LessThanOrEqualTransformFunction.java |  32 ++++
 .../function/LessThanTransformFunction.java        |  32 ++++
 .../function/LiteralTransformFunction.java         |  36 +++-
 .../function/NotEqualsTransformFunction.java       |  24 +++
 .../function/TransformFunctionFactory.java         |   9 +
 .../function/BaseTransformFunctionTest.java        |  15 ++
 .../BinaryOperatorTransformFunctionTest.java       | 103 +++++++++++
 .../function/CaseTransformFunctionTest.java        | 204 +++++++++++++++++++++
 .../function/EqualsTransformFunctionTest.java      |  52 ++++++
 .../GreaterThanOrEqualTransformFunctionTest.java   |  52 ++++++
 .../function/GreaterThanTransformFunctionTest.java |  52 ++++++
 .../LessThanOrEqualTransformFunctionTest.java      |  52 ++++++
 .../function/LessThanTransformFunctionTest.java    |  52 ++++++
 .../function/NotEqualsTransformFunctionTest.java   |  52 ++++++
 .../tests/OfflineClusterIntegrationTest.java       |  31 +++-
 22 files changed, 1135 insertions(+), 22 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index f8081ce..86d5c8d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -38,7 +38,15 @@ public enum TransformFunctionType {
   LN("ln"),
   SQRT("sqrt"),
 
+  EQUALS("equals"),
+  NOT_EQUALS("not_equals"),
+  GREATER_THAN("greater_than"),
+  GREATER_THAN_OR_EQUAL("greater_than_or_equal"),
+  LESS_THAN("less_than"),
+  LESS_THAN_OR_EQUAL("less_than_or_equal"),
+
   CAST("cast"),
+  CASE("case"),
   JSONEXTRACTSCALAR("jsonExtractScalar"),
   JSONEXTRACTKEY("jsonExtractKey"),
   TIMECONVERT("timeConvert"),
diff --git a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 80ef72b..b590be5 100644
--- a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -52,13 +52,9 @@ public class CalciteSqlCompilerTest {
   @Test
   public void testCaseWhenStatements() {
     PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(
-        "SELECT OrderID, Quantity,\n"
-            + "CASE\n"
-            + "    WHEN Quantity > 30 THEN 'The quantity is greater than 30'\n"
-            + "    WHEN Quantity = 30 THEN 'The quantity is 30'\n"
-            + "    ELSE 'The quantity is under 30'\n"
-            + "END AS QuantityText\n"
-            + "FROM OrderDetails");
+        "SELECT OrderID, Quantity,\n" + "CASE\n" + "    WHEN Quantity > 30 THEN 'The quantity is greater than 30'\n"
+            + "    WHEN Quantity = 30 THEN 'The quantity is 30'\n" + "    ELSE 'The quantity is under 30'\n"
+            + "END AS QuantityText\n" + "FROM OrderDetails");
     Assert.assertEquals(pinotQuery.getSelectList().get(0).getIdentifier().getName(), "OrderID");
     Assert.assertEquals(pinotQuery.getSelectList().get(1).getIdentifier().getName(), "Quantity");
     Function asFunc = pinotQuery.getSelectList().get(2).getFunctionCall();
@@ -79,14 +75,8 @@ public class CalciteSqlCompilerTest {
     Assert.assertEquals(caseFunc.getOperands().get(4).getLiteral().getFieldValue(), "The quantity is under 30");
 
     pinotQuery = CalciteSqlParser.compileToPinotQuery(
-        "SELECT Quantity,\n"
-            + "SUM(CASE\n"
-            + "    WHEN Quantity > 30 THEN 3\n"
-            + "    WHEN Quantity > 20 THEN 2\n"
-            + "    WHEN Quantity > 10 THEN 1\n"
-            + "    ELSE 0\n"
-            + "END) AS new_sum_quant\n"
-            + "FROM OrderDetails");
+        "SELECT Quantity,\n" + "SUM(CASE\n" + "    WHEN Quantity > 30 THEN 3\n" + "    WHEN Quantity > 20 THEN 2\n"
+            + "    WHEN Quantity > 10 THEN 1\n" + "    ELSE 0\n" + "END) AS new_sum_quant\n" + "FROM OrderDetails");
     Assert.assertEquals(pinotQuery.getSelectList().get(0).getIdentifier().getName(), "Quantity");
     asFunc = pinotQuery.getSelectList().get(1).getFunctionCall();
     Assert.assertEquals(asFunc.getOperator(), SqlKind.AS.name());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
new file mode 100644
index 0000000..d97a5d4
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * BinaryOperatorTransformFunction abstracts common functions for binary operators (=, !=, >=, >, <=, <)
+ * The results are in boolean format and stored as an integer array with 1 represents true and 0 represents false.
+ */
+public abstract class BinaryOperatorTransformFunction extends BaseTransformFunction {
+
+  protected TransformFunction _leftTransformFunction;
+  protected TransformFunction _rightTransformFunction;
+  protected int[] _results;
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() != 2) {
+      throw new IllegalArgumentException("Exact 2 arguments are required for greater transform function");
+    }
+    _leftTransformFunction = arguments.get(0);
+    _rightTransformFunction = arguments.get(1);
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return INT_SV_NO_DICTIONARY_METADATA;
+  }
+
+  protected void fillResultArray(ProjectionBlock projectionBlock) {
+    if (_results == null) {
+      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+    FieldSpec.DataType dataType = _leftTransformFunction.getResultMetadata().getDataType();
+    int length = projectionBlock.getNumDocs();
+    switch (dataType) {
+      case INT:
+        int[] leftIntValues = _leftTransformFunction.transformToIntValuesSV(projectionBlock);
+        int[] rightIntValues = _rightTransformFunction.transformToIntValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Integer) leftIntValues[i]).compareTo(rightIntValues[i]));
+        }
+        break;
+      case LONG:
+        long[] leftLongValues = _leftTransformFunction.transformToLongValuesSV(projectionBlock);
+        long[] rightLongValues = _rightTransformFunction.transformToLongValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Long) leftLongValues[i]).compareTo(rightLongValues[i]));
+        }
+        break;
+      case FLOAT:
+        float[] leftFloatValues = _leftTransformFunction.transformToFloatValuesSV(projectionBlock);
+        float[] rightFloatValues = _rightTransformFunction.transformToFloatValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Float) leftFloatValues[i]).compareTo(rightFloatValues[i]));
+        }
+        break;
+      case DOUBLE:
+        double[] leftDoubleValues = _leftTransformFunction.transformToDoubleValuesSV(projectionBlock);
+        double[] rightDoubleValues = _rightTransformFunction.transformToDoubleValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(((Double) leftDoubleValues[i]).compareTo(rightDoubleValues[i]));
+        }
+        break;
+      case STRING:
+        String[] leftStringValues = _leftTransformFunction.transformToStringValuesSV(projectionBlock);
+        String[] rightStringValues = _rightTransformFunction.transformToStringValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] = getBinaryFuncResult(leftStringValues[i].compareTo(rightStringValues[i]));
+        }
+        break;
+      case BYTES:
+        byte[][] leftBytesValues = _leftTransformFunction.transformToBytesValuesSV(projectionBlock);
+        byte[][] rightBytesValues = _rightTransformFunction.transformToBytesValuesSV(projectionBlock);
+        for (int i = 0; i < length; i++) {
+          _results[i] =
+              getBinaryFuncResult((new ByteArray(leftBytesValues[i])).compareTo(new ByteArray(rightBytesValues[i])));
+        }
+        break;
+      // NOTE: Multi-value columns are not comparable, so we should not reach here
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  abstract int getBinaryFuncResult(int result);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
new file mode 100644
index 0000000..71e1c43
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+
+
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+
+  private int _numberWhenStatements;
+  private List<TransformFunction> _whenStatements = new ArrayList<>();
+  private List<LiteralTransformFunction> _elseThenStatements = new ArrayList<>();
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add((LiteralTransformFunction) arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add((LiteralTransformFunction) arguments.get(i));
+    }
+    _resultMetadata = _elseThenStatements.get(0).getResultMetadata();
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return _resultMetadata;
+  }
+
+  private int[] getSelectedArray(ProjectionBlock projectionBlock) {
+    int[] selected = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      TransformFunction transformFunction = _whenStatements.get(i);
+      int[] conditions = transformFunction.transformToIntValuesSV(projectionBlock);
+      for (int j = 0; j < conditions.length; j++) {
+        if (selected[j] == 0 && conditions[j] == 1) {
+          selected[j] = i + 1;
+        }
+      }
+    }
+    return selected;
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    int[] results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Integer.parseInt(_elseThenStatements.get(selected[i]).getLiteral());
+    }
+    return results;
+  }
+
+  @Override
+  public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    long[] results = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Long.parseLong(_elseThenStatements.get(selected[i]).getLiteral());
+    }
+    return results;
+  }
+
+  @Override
+  public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    float[] results = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Float.parseFloat(_elseThenStatements.get(selected[i]).getLiteral());
+    }
+    return results;
+  }
+
+  @Override
+  public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    double[] results = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = Double.parseDouble(_elseThenStatements.get(selected[i]).getLiteral());
+    }
+    return results;
+  }
+
+  @Override
+  public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
+    int[] selected = getSelectedArray(projectionBlock);
+    String[] results = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < selected.length; i++) {
+      results[i] = _elseThenStatements.get(selected[i]).getLiteral();
+    }
+    return results;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunction.java
new file mode 100644
index 0000000..1bd5e58
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunction.java
@@ -0,0 +1,24 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class EqualsTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.EQUALS.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result == 0) ? 1 : 0;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunction.java
new file mode 100644
index 0000000..5afbdd2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class GreaterThanOrEqualTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result >= 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.GREATER_THAN_OR_EQUAL.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunction.java
new file mode 100644
index 0000000..fc36ded
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class GreaterThanTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result > 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.GREATER_THAN.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunction.java
new file mode 100644
index 0000000..21c5fce
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class LessThanOrEqualTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result <= 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.LESS_THAN_OR_EQUAL.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunction.java
new file mode 100644
index 0000000..717597c
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunction.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class LessThanTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+    super.init(arguments, dataSourceMap);
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result < 0) ? 1 : 0;
+  }
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.LESS_THAN.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
index 4317a2b..43e8dfc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.utils.BytesUtils;
 
 
 /**
@@ -35,6 +36,11 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
 public class LiteralTransformFunction implements TransformFunction {
   private final String _literal;
   private String[] _result;
+  private int[] _intResult;
+  private long[] _longResult;
+  private float[] _floatResult;
+  private double[] _doubleResult;
+  private byte[][] _bytesResult;
 
   public LiteralTransformFunction(String literal) {
     _literal = literal;
@@ -75,22 +81,38 @@ public class LiteralTransformFunction implements TransformFunction {
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_intResult == null) {
+      _intResult = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_intResult, Integer.parseInt(_literal));
+    }
+    return _intResult;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_longResult == null) {
+      _longResult = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_longResult, Long.parseLong(_literal));
+    }
+    return _longResult;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_floatResult == null) {
+      _floatResult = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_floatResult, Float.parseFloat(_literal));
+    }
+    return _floatResult;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_doubleResult == null) {
+      _doubleResult = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_doubleResult, Double.parseDouble(_literal));
+    }
+    return _doubleResult;
   }
 
   @Override
@@ -104,7 +126,11 @@ public class LiteralTransformFunction implements TransformFunction {
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_bytesResult == null) {
+      _bytesResult = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+      Arrays.fill(_bytesResult, BytesUtils.toBytes(_literal));
+    }
+    return _bytesResult;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunction.java
new file mode 100644
index 0000000..32ad40f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunction.java
@@ -0,0 +1,24 @@
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+
+
+public class NotEqualsTransformFunction extends BinaryOperatorTransformFunction {
+
+  @Override
+  public String getName() {
+    return TransformFunctionType.NOT_EQUALS.getName();
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+
+  @Override
+  int getBinaryFuncResult(int result) {
+    return (result != 0) ? 1 : 0;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index e9bd6bc..059db49 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -73,6 +73,15 @@ public class TransformFunctionFactory {
           put(TransformFunctionType.ARRAYLENGTH.getName().toLowerCase(), ArrayLengthTransformFunction.class);
           put(TransformFunctionType.VALUEIN.getName().toLowerCase(), ValueInTransformFunction.class);
           put(TransformFunctionType.MAPVALUE.getName().toLowerCase(), MapValueTransformFunction.class);
+
+          put(TransformFunctionType.CASE.getName().toLowerCase(), CaseTransformFunction.class);
+
+          put(TransformFunctionType.EQUALS.getName().toLowerCase(), EqualsTransformFunction.class);
+          put(TransformFunctionType.NOT_EQUALS.getName().toLowerCase(), NotEqualsTransformFunction.class);
+          put(TransformFunctionType.GREATER_THAN.getName().toLowerCase(), GreaterThanTransformFunction.class);
+          put(TransformFunctionType.GREATER_THAN_OR_EQUAL.getName().toLowerCase(), GreaterThanOrEqualTransformFunction.class);
+          put(TransformFunctionType.LESS_THAN.getName().toLowerCase(), LessThanTransformFunction.class);
+          put(TransformFunctionType.LESS_THAN_OR_EQUAL.getName().toLowerCase(), LessThanOrEqualTransformFunction.class);
         }
       };
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index ab9949c..1649d03 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -154,6 +154,21 @@ public abstract class BaseTransformFunctionTest {
         new DocIdSetOperator(new MatchAllFilterOperator(NUM_ROWS), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();
   }
 
+  protected void testTransformFunction(TransformFunction transformFunction, int[] expectedValues) {
+    int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
+    long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
+    float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
+    double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(intValues[i], expectedValues[i]);
+      Assert.assertEquals(longValues[i], expectedValues[i]);
+      Assert.assertEquals(floatValues[i], (float) expectedValues[i]);
+      Assert.assertEquals(doubleValues[i], (double) expectedValues[i]);
+      Assert.assertEquals(stringValues[i], Integer.toString(expectedValues[i]));
+    }
+  }
+
   protected void testTransformFunction(TransformFunction transformFunction, double[] expectedValues) {
     int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunctionTest.java
new file mode 100644
index 0000000..59d7b0d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunctionTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public abstract class BinaryOperatorTransformFunctionTest extends BaseTransformFunctionTest {
+
+  abstract int getExpectedValue(int value, int toCompare);
+
+  abstract int getExpectedValue(long value, long toCompare);
+
+  abstract int getExpectedValue(float value, float toCompare);
+
+  abstract int getExpectedValue(double value, double toCompare);
+
+  abstract int getExpectedValue(String value, String toCompare);
+
+  abstract String getFuncName();
+
+  @Test
+  public void testBinaryOperatorTransformFunction() {
+    TransformExpressionTree expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %d)", getFuncName(), INT_SV_COLUMN, _intSVValues[0]));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertEquals(transformFunction.getName(), getFuncName().toLowerCase());
+    int[] expectedIntValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedIntValues[i] = getExpectedValue(_intSVValues[i], _intSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedIntValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %d)", getFuncName(), LONG_SV_COLUMN, _longSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedLongValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedLongValues[i] = getExpectedValue(_longSVValues[i], _longSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedLongValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %f)", getFuncName(), FLOAT_SV_COLUMN, _floatSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedFloatValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedFloatValues[i] = getExpectedValue(_floatSVValues[i], _floatSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedFloatValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, %.20f)", getFuncName(), DOUBLE_SV_COLUMN, _doubleSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedDoubleValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedDoubleValues[i] = getExpectedValue(_doubleSVValues[i], _doubleSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedDoubleValues);
+
+    expression = TransformExpressionTree
+        .compileToExpressionTree(String.format("%s(%s, '%s')", getFuncName(), STRING_SV_COLUMN, _stringSVValues[0]));
+    transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    int[] expectedStringValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedStringValues[i] = getExpectedValue(_stringSVValues[i], _stringSVValues[0]);
+    }
+    testTransformFunction(transformFunction, expectedStringValues);
+  }
+
+  @Test(dataProvider = "testIllegalArguments", expectedExceptions = {BadQueryRequestException.class})
+  public void testIllegalArguments(String expressionStr) {
+    TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(expressionStr);
+    TransformFunctionFactory.get(expression, _dataSourceMap);
+  }
+
+  @DataProvider(name = "testIllegalArguments")
+  public Object[][] testIllegalArguments() {
+    return new Object[][]{new Object[]{String.format("%s(%s)", getFuncName(),
+        INT_SV_COLUMN)}, new Object[]{String.format("%s(%s, %s, %s)", getFuncName(), LONG_SV_COLUMN, INT_SV_COLUMN,
+        STRING_SV_COLUMN)}};
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
new file mode 100644
index 0000000..cb35425
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.Random;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
+
+  private final int INDEX_TO_COMPARE = new Random(System.currentTimeMillis()).nextInt(NUM_ROWS);
+  private final TransformFunctionType[] BINARY_OPERATOR_TRANSFORM_FUNCTIONS =
+      new TransformFunctionType[]{TransformFunctionType.EQUALS, TransformFunctionType.NOT_EQUALS, TransformFunctionType.GREATER_THAN, TransformFunctionType.GREATER_THAN_OR_EQUAL, TransformFunctionType.LESS_THAN, TransformFunctionType.LESS_THAN_OR_EQUAL};
+
+  @Test
+  public void testCaseTransformFunction() {
+    for (TransformFunctionType functionType : BINARY_OPERATOR_TRANSFORM_FUNCTIONS) {
+      testCountVsCaseQuery(String.format("%s(%s, %s)", functionType.getName(), INT_SV_COLUMN,
+          String.format("%d", _intSVValues[INDEX_TO_COMPARE])), getExpectedValuesFor(INT_SV_COLUMN, functionType));
+      testCountVsCaseQuery(String.format("%s(%s, %s)", functionType.getName(), LONG_SV_COLUMN,
+          String.format("%d", _longSVValues[INDEX_TO_COMPARE])), getExpectedValuesFor(LONG_SV_COLUMN, functionType));
+      testCountVsCaseQuery(String.format("%s(%s, %s)", functionType.getName(), FLOAT_SV_COLUMN,
+          String.format("%f", _floatSVValues[INDEX_TO_COMPARE])), getExpectedValuesFor(FLOAT_SV_COLUMN, functionType));
+      testCountVsCaseQuery(String.format("%s(%s, %s)", functionType.getName(), DOUBLE_SV_COLUMN,
+          String.format("%.20f", _doubleSVValues[INDEX_TO_COMPARE])),
+          getExpectedValuesFor(DOUBLE_SV_COLUMN, functionType));
+      testCountVsCaseQuery(String.format("%s(%s, %s)", functionType.getName(), STRING_SV_COLUMN,
+          String.format("'%s'", _stringSVValues[INDEX_TO_COMPARE])),
+          getExpectedValuesFor(STRING_SV_COLUMN, functionType));
+    }
+  }
+
+  private void testCountVsCaseQuery(String predicate, int[] expectedValues) {
+    TransformExpressionTree expression =
+        TransformExpressionTree.compileToExpressionTree(String.format("CASE(%s, 100, 10)", predicate));
+    expression.getChildren().set(0, TransformExpressionTree.compileToExpressionTree(predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    Assert.assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    testTransformFunction(transformFunction, expectedValues);
+  }
+
+  @Test(dataProvider = "testIllegalArguments", expectedExceptions = {BadQueryRequestException.class})
+  public void testIllegalArguments(String expressionStr) {
+    TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(expressionStr);
+    TransformFunctionFactory.get(expression, _dataSourceMap);
+  }
+
+  @DataProvider(name = "testIllegalArguments")
+  public Object[][] testIllegalArguments() {
+    return new Object[][]{new Object[]{String.format("case(%s)", INT_SV_COLUMN)}, new Object[]{String.format(
+        "case(%s, %s)", LONG_SV_COLUMN, 10)}};
+  }
+
+  private int[] getExpectedValuesFor(String column, TransformFunctionType type) {
+    int[] result = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      switch (column) {
+        case INT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_intSVValues[i] == _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_intSVValues[i] != _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_intSVValues[i] > _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] >= _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_intSVValues[i] < _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_intSVValues[i] <= _intSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case LONG_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_longSVValues[i] == _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_longSVValues[i] != _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_longSVValues[i] > _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] >= _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_longSVValues[i] < _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_longSVValues[i] <= _longSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case FLOAT_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_floatSVValues[i] == _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_floatSVValues[i] != _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_floatSVValues[i] > _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] >= _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_floatSVValues[i] < _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_floatSVValues[i] <= _floatSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case DOUBLE_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_doubleSVValues[i] == _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_doubleSVValues[i] != _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_doubleSVValues[i] > _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] >= _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_doubleSVValues[i] < _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_doubleSVValues[i] <= _doubleSVValues[INDEX_TO_COMPARE]) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+        case STRING_SV_COLUMN:
+          switch (type) {
+            case EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) == 0) ? 100 : 10;
+              break;
+            case NOT_EQUALS:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) != 0) ? 100 : 10;
+              break;
+            case GREATER_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) > 0) ? 100 : 10;
+              break;
+            case GREATER_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) >= 0) ? 100 : 10;
+              break;
+            case LESS_THAN:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) < 0) ? 100 : 10;
+              break;
+            case LESS_THAN_OR_EQUAL:
+              result[i] = (_stringSVValues[i].compareTo(_stringSVValues[INDEX_TO_COMPARE]) <= 0) ? 100 : 10;
+              break;
+            default:
+              throw new IllegalStateException("Not supported type - " + type);
+          }
+          break;
+      }
+    }
+    return result;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunctionTest.java
new file mode 100644
index 0000000..6e9dab5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/EqualsTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class EqualsTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value == toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) == 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new EqualsTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunctionTest.java
new file mode 100644
index 0000000..8c26967
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanOrEqualTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class GreaterThanOrEqualTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value >= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) >= 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new GreaterThanOrEqualTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunctionTest.java
new file mode 100644
index 0000000..ee0d5e5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/GreaterThanTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class GreaterThanTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value > toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) > 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new GreaterThanTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunctionTest.java
new file mode 100644
index 0000000..9c3569b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanOrEqualTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class LessThanOrEqualTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value <= toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) <= 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new LessThanOrEqualTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunctionTest.java
new file mode 100644
index 0000000..f04b06b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LessThanTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class LessThanTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value < toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) < 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new LessThanTransformFunction().getName();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunctionTest.java
new file mode 100644
index 0000000..7567730
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NotEqualsTransformFunctionTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+public class NotEqualsTransformFunctionTest extends BinaryOperatorTransformFunctionTest {
+
+  @Override
+  int getExpectedValue(int value, int toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(long value, long toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(float value, float toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(double value, double toCompare) {
+    return (value != toCompare) ? 1 : 0;
+  }
+
+  @Override
+  int getExpectedValue(String value, String toCompare) {
+    return (value.compareTo(toCompare) != 0) ? 1 : 0;
+  }
+
+  @Override
+  String getFuncName() {
+    return new NotEqualsTransformFunction().getName();
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 223602c..fb59604 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -95,6 +95,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
   private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
       new ArrayList<>(getNumBrokers() + getNumServers());
+  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
 
   protected int getNumBrokers() {
     return NUM_BROKERS;
@@ -104,8 +105,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     return NUM_SERVERS;
   }
 
-  private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
-
   @Override
   protected String getSchemaFileName() {
     return _schemaFileName;
@@ -753,6 +752,34 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
+  public void testCaseWhenStatement()
+      throws Exception {
+    testCountVsCaseQuery("origin = 'ATL'");
+    testCountVsCaseQuery("origin <> 'ATL'");
+
+    testCountVsCaseQuery("DaysSinceEpoch > 16312");
+    testCountVsCaseQuery("DaysSinceEpoch >= 16312");
+    testCountVsCaseQuery("DaysSinceEpoch < 16312");
+    testCountVsCaseQuery("DaysSinceEpoch <= 16312");
+    testCountVsCaseQuery("DaysSinceEpoch = 16312");
+    testCountVsCaseQuery("DaysSinceEpoch <> 16312");
+  }
+
+  private void testCountVsCaseQuery(String predicate)
+      throws Exception {
+    // System.out.println("predicate = " + predicate);
+    String sqlQuery = String.format("SELECT COUNT(*) FROM mytable WHERE %s", predicate);
+    JsonNode response = postSqlQuery(sqlQuery, _brokerBaseApiUrl);
+    // System.out.println(String.format("query = %s, response = %s",sqlQuery, response));
+    long countValue = response.get("resultTable").get("rows").get(0).get(0).asLong();
+    sqlQuery = String.format("SELECT SUM(CASE WHEN %s THEN 1 ELSE 0 END) as sum1 FROM mytable", predicate);
+    response = postSqlQuery(sqlQuery, _brokerBaseApiUrl);
+    // System.out.println(String.format("query = %s, response = %s",sqlQuery, response));
+    long caseSum = response.get("resultTable").get("rows").get(0).get(0).asLong();
+    Assert.assertEquals(caseSum, countValue);
+  }
+
+  @Test
   public void testFilterWithInvertedIndexUDF()
       throws Exception {
     int daysSinceEpoch = 16138;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org