You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/15 19:39:39 UTC
[pinot] branch master updated: [multistage] [testing] Filter Operator Unit Test (#9792)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b8c8a0fe8 [multistage] [testing] Filter Operator Unit Test (#9792)
7b8c8a0fe8 is described below
commit 7b8c8a0fe8fca07d0cad66d0bfa4d02d31dc74c6
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Tue Nov 15 11:39:33 2022 -0800
[multistage] [testing] Filter Operator Unit Test (#9792)
---
.../pinot/query/planner/logical/RexExpression.java | 6 +
.../pinot/query/planner/logical/StagePlanner.java | 2 +-
.../query/runtime/operator/FilterOperator.java | 14 +-
.../runtime/operator/operands/FilterOperand.java | 52 ++--
.../query/runtime/operator/FilterOperatorTest.java | 288 +++++++++++++++++++++
.../query/runtime/operator/OperatorTestUtil.java | 5 +
6 files changed, 345 insertions(+), 22 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 3e964e77d0..e9a4a99679 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -203,12 +203,18 @@ public interface RexExpression {
}
class FunctionCall implements RexExpression {
+ // the underlying SQL operator kind of this function.
+ // It can be either a standard SQL operator or an extended function kind.
+ // @see #SqlKind.FUNCTION, #SqlKind.OTHER, #SqlKind.OTHER_FUNCTION
@ProtoProperties
private SqlKind _sqlKind;
+ // the return data type of the function.
@ProtoProperties
private FieldSpec.DataType _dataType;
+ // the name of the SQL function. For standard SqlKind it should match the SqlKind ENUM name.
@ProtoProperties
private String _functionName;
+ // the list of RexExpressions that represents the operands to the function.
@ProtoProperties
private List<RexExpression> _functionOperands;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 323e7f506b..2d61856c85 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -41,7 +41,7 @@ import org.apache.pinot.query.routing.WorkerManager;
* This class is non-threadsafe. Do not reuse the stage planner for multiple query plans.
*/
public class StagePlanner {
- private final PlannerContext _plannerContext;
+ private final PlannerContext _plannerContext; // DO NOT REMOVE.
private final WorkerManager _workerManager;
private int _stageIdCounter;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index f639020b25..66956a95ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -30,7 +30,19 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
-
+/*
+ FilterOperator apply filter on rows from upstreamOperator.
+ There are three types of filter operands
+ 1) inputRef
+ 2) Literal
+ 3) FunctionOperand
+ All three types' result has to be a boolean to be used to filter rows.
+ FunctionOperand supports,
+ 1) AND, OR, NOT functions to combine operands.
+ 2) Binary Operand: equals, notEquals, greaterThan, greaterThanOrEqual, lessThan, lessThanOrEqual
+ 3) All boolean scalar functions we have that take tranformOperand.
+ Note: Scalar functions are the ones we have in v1 engine and only do function name and arg # matching.
+ */
public class FilterOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "FILTER";
private final Operator<TransferableBlock> _upstreamOperator;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
index 3c58d1710a..b152b33b88 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
@@ -41,69 +41,73 @@ public abstract class FilterOperand extends TransformOperand {
}
}
- public static FilterOperand toFilterOperand(RexExpression.Literal literal) {
+ private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
return new BooleanLiteral(literal);
}
- public static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) {
+ private static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, DataSchema dataSchema) {
return new BooleanInputRef(inputRef, dataSchema);
}
- public static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
-
+ private static FilterOperand toFilterOperand(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
+ int operandSize = functionCall.getFunctionOperands().size();
+ // TODO: Move these functions out of this class.
switch (OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
case "AND":
+ Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument, passed in argument size:" + operandSize);
return new And(functionCall.getFunctionOperands(), dataSchema);
case "OR":
+ Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument, passed in argument size:" + operandSize);
return new Or(functionCall.getFunctionOperands(), dataSchema);
case "NOT":
+ Preconditions.checkState(operandSize == 1, "NOT takes one argument, passed in argument size:" + operandSize);
return new Not(toFilterOperand(functionCall.getFunctionOperands().get(0), dataSchema));
case "equals":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) == 0;
+ return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ == 0;
}
};
case "notEquals":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) != 0;
+ return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ != 0;
}
};
case "greaterThan":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) > 0;
+ return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ > 0;
}
};
case "greaterThanOrEqual":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) >= 0;
+ return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ >= 0;
}
};
case "lessThan":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) < 0;
+ return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ < 0;
}
};
case "lessThanOrEqual":
return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
@Override
public Boolean apply(Object[] row) {
- return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(
- _resultType.convert(_rhs.apply(row))) <= 0;
+ return ((Comparable) _resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
+ <= 0;
}
};
default:
@@ -119,7 +123,8 @@ public abstract class FilterOperand extends TransformOperand {
public BooleanFunction(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
FunctionOperand func = (FunctionOperand) TransformOperand.toTransformOperand(functionCall, dataSchema);
- Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN);
+ Preconditions.checkState(func.getResultType() == DataSchema.ColumnDataType.BOOLEAN,
+ "Expecting boolean result type but got type:" + func.getResultType());
_func = func;
}
@@ -133,8 +138,9 @@ public abstract class FilterOperand extends TransformOperand {
private final RexExpression.InputRef _inputRef;
public BooleanInputRef(RexExpression.InputRef inputRef, DataSchema dataSchema) {
- Preconditions.checkState(dataSchema.getColumnDataType(inputRef.getIndex())
- == DataSchema.ColumnDataType.BOOLEAN);
+ DataSchema.ColumnDataType inputType = dataSchema.getColumnDataType(inputRef.getIndex());
+ Preconditions.checkState(inputType == DataSchema.ColumnDataType.BOOLEAN,
+ "Input has to be boolean type but got type:" + inputType);
_inputRef = inputRef;
}
@@ -148,7 +154,8 @@ public abstract class FilterOperand extends TransformOperand {
private final Object _literalValue;
public BooleanLiteral(RexExpression.Literal literal) {
- Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN);
+ Preconditions.checkState(literal.getDataType() == FieldSpec.DataType.BOOLEAN,
+ "Only boolean literal is supported as filter, but got type:" + literal.getDataType());
_literalValue = literal.getValue();
}
@@ -160,6 +167,7 @@ public abstract class FilterOperand extends TransformOperand {
private static class And extends FilterOperand {
List<FilterOperand> _childOperands;
+
public And(List<RexExpression> childExprs, DataSchema dataSchema) {
_childOperands = new ArrayList<>(childExprs.size());
for (RexExpression childExpr : childExprs) {
@@ -180,6 +188,7 @@ public abstract class FilterOperand extends TransformOperand {
private static class Or extends FilterOperand {
List<FilterOperand> _childOperands;
+
public Or(List<RexExpression> childExprs, DataSchema dataSchema) {
_childOperands = new ArrayList<>(childExprs.size());
for (RexExpression childExpr : childExprs) {
@@ -200,6 +209,7 @@ public abstract class FilterOperand extends TransformOperand {
private static class Not extends FilterOperand {
FilterOperand _childOperand;
+
public Not(FilterOperand childOperand) {
_childOperand = childOperand;
}
@@ -216,6 +226,8 @@ public abstract class FilterOperand extends TransformOperand {
protected final DataSchema.ColumnDataType _resultType;
public Predicate(List<RexExpression> functionOperands, DataSchema dataSchema) {
+ Preconditions.checkState(functionOperands.size() == 2,
+ "Expected 2 function ops for Predicate but got:" + functionOperands.size());
_lhs = TransformOperand.toTransformOperand(functionOperands.get(0), dataSchema);
_rhs = TransformOperand.toTransformOperand(functionOperands.get(1), dataSchema);
if (_lhs._resultType != null && _lhs._resultType != DataSchema.ColumnDataType.OBJECT) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
new file mode 100644
index 0000000000..7b1f5bce5f
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+ private AutoCloseable _mocks;
+ @Mock
+ private Operator<TransferableBlock> _upstreamOperator;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void shouldPropagateUpstreamErrorBlock() {
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("filterError")));
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN
+ });
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+ TransferableBlock errorBlock = op.getNextBlock();
+ Assert.assertTrue(errorBlock.isErrorBlock());
+ DataBlock error = errorBlock.getDataBlock();
+ Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+ }
+
+ @Test
+ public void shouldPropagateUpstreamEOS() {
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertTrue(dataBlock.isEndOfStreamBlock());
+ }
+
+ @Test
+ public void shouldPropagateUpstreamNoop() {
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertTrue(dataBlock.isNoOpBlock());
+ }
+
+ @Test
+ public void shouldHandleTrueBooleanLiteralFilter() {
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0)[0], 0);
+ Assert.assertEquals(result.get(1)[0], 1);
+ }
+
+ @Test
+ public void shouldHandleFalseBooleanLiteralFilter() {
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, false);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*boolean literal.*")
+ public void shouldThrowOnNonBooleanTypeBooleanLiteral() {
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "false");
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Input has to be "
+ + "boolean type.*")
+ public void shouldThrowOnNonBooleanTypeInputRef() {
+ RexExpression ref0 = new RexExpression.InputRef(0);
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0);
+ }
+
+ @Test
+ public void shouldHandleBooleanInputRef() {
+ RexExpression ref1 = new RexExpression.InputRef(1);
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol", "boolCol"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], 1);
+ Assert.assertEquals(result.get(0)[1], true);
+ }
+
+ @Test
+ public void shouldHandleAndFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false},
+ new Object[]{true, false}));
+ RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
+ ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
+
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], true);
+ Assert.assertEquals(result.get(0)[1], true);
+ }
+
+ @Test
+ public void shouldHandleOrFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false},
+ new Object[]{true, false}));
+ RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
+ ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
+
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0)[0], true);
+ Assert.assertEquals(result.get(0)[1], true);
+ Assert.assertEquals(result.get(1)[0], true);
+ Assert.assertEquals(result.get(1)[1], false);
+ }
+
+ @Test
+ public void shouldHandleNotFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol0", "boolCol1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(inputSchema, new Object[]{true, true}, new Object[]{false, false},
+ new Object[]{true, false}));
+ RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
+ ImmutableList.of(new RexExpression.InputRef(0)));
+
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], false);
+ Assert.assertEquals(result.get(0)[1], false);
+ }
+
+ @Test
+ public void shouldHandleGreaterThanFilter() {
+ DataSchema inputSchema = new DataSchema(new String[]{"int0", "int1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, 2}, new Object[]{3, 2}, new Object[]{1, 1}));
+ RexExpression.FunctionCall greaterThan =
+ new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan",
+ ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ List<Object[]> expectedResult = ImmutableList.of(new Object[]{3, 2});
+ Assert.assertEquals(result.size(), expectedResult.size());
+ Assert.assertEquals(result.get(0), expectedResult.get(0));
+ }
+
+ @Test
+ public void shouldHandleBooleanFunction() {
+ DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{"starTree"}, new Object[]{"treeStar"}));
+ RexExpression.FunctionCall startsWith =
+ new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith",
+ ImmutableList.of(new RexExpression.InputRef(0),
+ new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ List<Object[]> expectedResult = ImmutableList.of(new Object[]{"starTree"});
+ Assert.assertEquals(result.size(), expectedResult.size());
+ Assert.assertEquals(result.get(0), expectedResult.get(0));
+ }
+
+ @Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = ".*Cannot find function "
+ + "with Name: startsWithError.*")
+ public void shouldThrowOnUnfoundFunction() {
+ DataSchema inputSchema = new DataSchema(new String[]{"string1"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{"starTree"}, new Object[]{"treeStar"}));
+ RexExpression.FunctionCall startsWith =
+ new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError",
+ ImmutableList.of(new RexExpression.InputRef(0),
+ new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith);
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 86b65a1686..0537c67ca9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
import java.util.Arrays;
import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -57,4 +58,8 @@ public class OperatorTestUtil {
public static DataSchema getDataSchema(String operatorName) {
return MOCK_OPERATOR_FACTORY.getDataSchema(operatorName);
}
+
+ public static TransferableBlock block(DataSchema schema, Object[]... rows) {
+ return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org