You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/15 19:39:39 UTC

[pinot] branch master updated: [multistage] [testing] Filter Operator Unit Test (#9792)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b8c8a0fe8 [multistage] [testing] Filter Operator Unit Test (#9792)
7b8c8a0fe8 is described below

commit 7b8c8a0fe8fca07d0cad66d0bfa4d02d31dc74c6
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Tue Nov 15 11:39:33 2022 -0800

    [multistage] [testing] Filter Operator Unit Test (#9792)
---
 .../pinot/query/planner/logical/RexExpression.java |   6 +
 .../pinot/query/planner/logical/StagePlanner.java  |   2 +-
 .../query/runtime/operator/FilterOperator.java     |  14 +-
 .../runtime/operator/operands/FilterOperand.java   |  52 ++--
 .../query/runtime/operator/FilterOperatorTest.java | 288 +++++++++++++++++++++
 .../query/runtime/operator/OperatorTestUtil.java   |   5 +
 6 files changed, 345 insertions(+), 22 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 3e964e77d0..e9a4a99679 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -203,12 +203,18 @@ public interface RexExpression {
   }
 
   class FunctionCall implements RexExpression {
+    // the underlying SQL operator kind of this function.
+    // It can be either a standard SQL operator or an extended function kind.
+    // @see #SqlKind.FUNCTION, #SqlKind.OTHER, #SqlKind.OTHER_FUNCTION
     @ProtoProperties
     private SqlKind _sqlKind;
+    // the return data type of the function.
     @ProtoProperties
     private FieldSpec.DataType _dataType;
+    // the name of the SQL function. For standard SqlKind it should match the SqlKind ENUM name.
     @ProtoProperties
     private String _functionName;
+    // the list of RexExpressions that represents the operands to the function.
     @ProtoProperties
     private List<RexExpression> _functionOperands;
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 323e7f506b..2d61856c85 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -41,7 +41,7 @@ import org.apache.pinot.query.routing.WorkerManager;
  * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans.
  */
 public class StagePlanner {
-  private final PlannerContext _plannerContext;
+  private final PlannerContext _plannerContext;   // DO NOT REMOVE.
   private final WorkerManager _workerManager;
   private int _stageIdCounter;
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index f639020b25..66956a95ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -30,7 +30,19 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
 
-
+/*
+   FilterOperator apply filter on rows from upstreamOperator.
+   There are three types of filter operands
+   1) inputRef
+   2) Literal
+   3) FunctionOperand
+   All three types' result has to be a boolean to be used to filter rows.
+   FunctionOperand supports,
+    1) AND, OR, NOT functions to combine operands.
+    2) Binary Operand: equals, notEquals, greaterThan, greaterThanOrEqual, lessThan, lessThanOrEqual
+    3) All boolean scalar functions we have that take tranformOperand.
+    Note: Scalar functions are the ones we have in v1 engine and only do function name and arg # matching.
+ */
 public class FilterOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "FILTER";
   private final Operator<TransferableBlock> _upstreamOperator;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
index 3c58d1710a..b152b33b88 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
@@ -41,69 +41,73 @@ public abstract class FilterOperand extends TransformOperand {
     }
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.Literal literal) {
+  private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
     return new BooleanLiteral(literal);
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) {
+  private static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) {
     return new BooleanInputRef(inputRef, dataSchema);
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
-
+  private static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
+    int operandSize = functionCall.getFunctionOperands().size();
+    // TODO: Move these functions out of this class.
     switch (OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
       case "AND":
+        Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument, passed in argument size:" + operandSize);
         return new And(functionCall.getFunctionOperands(), dataSchema);
       case "OR":
+        Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument, passed in argument size:" + operandSize);
         return new Or(functionCall.getFunctionOperands(), dataSchema);
       case "NOT":
+        Preconditions.checkState(operandSize == 1, "NOT takes one argument, passed in argument size:" + operandSize);
         return new Not(toFilterOperand(functionCall.getFunctionOperands().get(0), dataSchema));
       case "equals":
         return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
           @Override
           public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
-                _resultType.convert(_rhs.apply(row))) == 0;
+            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+                == 0;
           }
         };
       case "notEquals":
         return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
           @Override
           public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
-                _resultType.convert(_rhs.apply(row))) != 0;
+            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+                != 0;
           }
         };
       case "greaterThan":
         return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
           @Override
           public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
-                _resultType.convert(_rhs.apply(row))) > 0;
+            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+                > 0;
           }
         };
       case "greaterThanOrEqual":
         return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
           @Override
           public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
-                _resultType.convert(_rhs.apply(row))) >= 0;
+            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+                >= 0;
           }
         };
       case "lessThan":
         return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
           @Override
           public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
-                _resultType.convert(_rhs.apply(row))) < 0;
+            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+                < 0;
           }
         };
       case "lessThanOrEqual":
         return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
           @Override
           public Boolean apply(Object[] row) {
-            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
-                _resultType.convert(_rhs.apply(row))) <= 0;
+            return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+                <= 0;
           }
         };
       default:
@@ -119,7 +123,8 @@ public abstract class FilterOperand extends TransformOperand {
 
     public BooleanFunction(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
       FunctionOperand func = (FunctionOperand) TransformOperand.toTransformOperand(functionCall, dataSchema);
-      Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN);
+      Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN,
+          "Expecting boolean result type but got type:" + func.getResultType());
       _func = func;
     }
 
@@ -133,8 +138,9 @@ public abstract class FilterOperand extends TransformOperand {
     private final RexExpression.InputRef _inputRef;
 
     public BooleanInputRef(RexExpression.InputRef inputRef, DataSchema dataSchema) {
-      Preconditions.checkState(dataSchema.getColumnDataType(inputRef.getIndex())
-          == DataSchema.ColumnDataType.BOOLEAN);
+      DataSchema.ColumnDataType inputType = dataSchema.getColumnDataType(inputRef.getIndex());
+      Preconditions.checkState(inputType == DataSchema.ColumnDataType.BOOLEAN,
+          "Input has to be boolean type but got type:" + inputType);
       _inputRef = inputRef;
     }
 
@@ -148,7 +154,8 @@ public abstract class FilterOperand extends TransformOperand {
     private final Object _literalValue;
 
     public BooleanLiteral(RexExpression.Literal literal) {
-      Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN);
+      Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN,
+          "Only boolean literal is supported as filter, but got type:" + literal.getDataType());
       _literalValue = literal.getValue();
     }
 
@@ -160,6 +167,7 @@ public abstract class FilterOperand extends TransformOperand {
 
   private static class And extends FilterOperand {
     List<FilterOperand> _childOperands;
+
     public And(List<RexExpression> childExprs, DataSchema dataSchema) {
       _childOperands = new ArrayList<>(childExprs.size());
       for (RexExpression childExpr : childExprs) {
@@ -180,6 +188,7 @@ public abstract class FilterOperand extends TransformOperand {
 
   private static class Or extends FilterOperand {
     List<FilterOperand> _childOperands;
+
     public Or(List<RexExpression> childExprs, DataSchema dataSchema) {
       _childOperands = new ArrayList<>(childExprs.size());
       for (RexExpression childExpr : childExprs) {
@@ -200,6 +209,7 @@ public abstract class FilterOperand extends TransformOperand {
 
   private static class Not extends FilterOperand {
     FilterOperand _childOperand;
+
     public Not(FilterOperand childOperand) {
       _childOperand = childOperand;
     }
@@ -216,6 +226,8 @@ public abstract class FilterOperand extends TransformOperand {
     protected final DataSchema.ColumnDataType _resultType;
 
     public Predicate(List<RexExpression> functionOperands, DataSchema dataSchema) {
+      Preconditions.checkState(functionOperands.size() == 2,
+          "Expected 2 function ops for Predicate but got:" + functionOperands.size());
       _lhs = TransformOperand.toTransformOperand(functionOperands.get(0), dataSchema);
       _rhs = TransformOperand.toTransformOperand(functionOperands.get(1), dataSchema);
       if (_lhs._resultType != null && _lhs._resultType != DataSchema.ColumnDataType.OBJECT) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
new file mode 100644
index 0000000000..7b1f5bce5f
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+  private AutoCloseable _mocks;
+  @Mock
+  private Operator<TransferableBlock> _upstreamOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldPropagateUpstreamErrorBlock() {
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("filterError")));
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN
+    });
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+    TransferableBlock errorBlock = op.getNextBlock();
+    Assert.assertTrue(errorBlock.isErrorBlock());
+    DataBlock error = errorBlock.getDataBlock();
+    Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+  }
+
+  @Test
+  public void shouldPropagateUpstreamEOS() {
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertTrue(dataBlock.isEndOfStreamBlock());
+  }
+
+  @Test
+  public void shouldPropagateUpstreamNoop() {
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertTrue(dataBlock.isNoOpBlock());
+  }
+
+  @Test
+  public void shouldHandleTrueBooleanLiteralFilter() {
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get(0)[0], 0);
+    Assert.assertEquals(result.get(1)[0], 1);
+  }
+
+  @Test
+  public void shouldHandleFalseBooleanLiteralFilter() {
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, false);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertTrue(result.isEmpty());
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*boolean literal.*")
+  public void shouldThrowOnNonBooleanTypeBooleanLiteral() {
+    RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "false");
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Input has to be "
+      + "boolean type.*")
+  public void shouldThrowOnNonBooleanTypeInputRef() {
+    RexExpression ref0 = new RexExpression.InputRef(0);
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0);
+  }
+
+  @Test
+  public void shouldHandleBooleanInputRef() {
+    RexExpression ref1 = new RexExpression.InputRef(1);
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol", "boolCol"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.BOOLEAN
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0)[0], 1);
+    Assert.assertEquals(result.get(0)[1], true);
+  }
+
+  @Test
+  public void shouldHandleAndFilter() {
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+    });
+    Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+        OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false},
+            new Object[]{true, false}));
+    RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
+        ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
+
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0)[0], true);
+    Assert.assertEquals(result.get(0)[1], true);
+  }
+
+  @Test
+  public void shouldHandleOrFilter() {
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+    });
+    Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+        OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false},
+            new Object[]{true, false}));
+    RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
+        ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
+
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get(0)[0], true);
+    Assert.assertEquals(result.get(0)[1], true);
+    Assert.assertEquals(result.get(1)[0], true);
+    Assert.assertEquals(result.get(1)[1], false);
+  }
+
+  @Test
+  public void shouldHandleNotFilter() {
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+    });
+    Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+        OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false},
+            new Object[]{true, false}));
+    RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
+        ImmutableList.of(new RexExpression.InputRef(0)));
+
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0)[0], false);
+    Assert.assertEquals(result.get(0)[1], false);
+  }
+
+  @Test
+  public void shouldHandleGreaterThanFilter() {
+    DataSchema inputSchema = new DataSchema(new String[]{"int0", "int1"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, 2}, new Object[]{3, 2}, new Object[]{1, 1}));
+    RexExpression.FunctionCall greaterThan =
+        new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan",
+            ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    List<Object[]> expectedResult = ImmutableList.of(new Object[]{3, 2});
+    Assert.assertEquals(result.size(), expectedResult.size());
+    Assert.assertEquals(result.get(0), expectedResult.get(0));
+  }
+
+  @Test
+  public void shouldHandleBooleanFunction() {
+    DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{"starTree"}, new Object[]{"treeStar"}));
+    RexExpression.FunctionCall startsWith =
+        new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith",
+            ImmutableList.of(new RexExpression.InputRef(0),
+                new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    List<Object[]> expectedResult = ImmutableList.of(new Object[]{"starTree"});
+    Assert.assertEquals(result.size(), expectedResult.size());
+    Assert.assertEquals(result.get(0), expectedResult.get(0));
+  }
+
+  @Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = ".*Cannot find function "
+      + "with Name: startsWithError.*")
+  public void shouldThrowOnUnfoundFunction() {
+    DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{"starTree"}, new Object[]{"treeStar"}));
+    RexExpression.FunctionCall startsWith =
+        new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError",
+            ImmutableList.of(new RexExpression.InputRef(0),
+                new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith);
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 86b65a1686..0537c67ca9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
 
 import java.util.Arrays;
 import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -57,4 +58,8 @@ public class OperatorTestUtil {
   public static DataSchema getDataSchema(String operatorName) {
     return MOCK_OPERATOR_FACTORY.getDataSchema(operatorName);
   }
+
+  public static TransferableBlock block(DataSchema schema, Object[]... rows) {
+    return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org