You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/05/03 20:23:59 UTC
[pinot] branch master updated: [multistage] UNION/INTERSECT/EXCEPT implementation (#10622)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1c4d0fe0a3 [multistage] UNION/INTERSECT/EXCEPT implementation (#10622)
1c4d0fe0a3 is described below
commit 1c4d0fe0a3d41a9385d6de04829c282a484aaf55
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Wed May 3 13:23:52 2023 -0700
[multistage] UNION/INTERSECT/EXCEPT implementation (#10622)
---
.../calcite/rel/rules/PinotQueryRuleSets.java | 3 +-
.../rules/PinotSetOpExchangeNodeInsertRule.java | 87 ++++++++++++
.../src/test/resources/queries/SetOpPlans.json | 74 ++++++----
.../query/runtime/operator/IntersectOperator.java | 49 +++++++
.../query/runtime/operator/MinusOperator.java | 49 +++++++
.../pinot/query/runtime/operator/SetOperator.java | 154 +++++++++++++++++++++
.../query/runtime/operator/UnionOperator.java | 61 ++++++++
.../query/runtime/operator/utils/SortUtils.java | 17 +++
.../query/runtime/plan/PhysicalPlanVisitor.java | 25 +++-
.../runtime/plan/ServerRequestPlanVisitor.java | 5 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 55 ++++++--
.../runtime/operator/IntersectOperatorTest.java | 119 ++++++++++++++++
.../query/runtime/operator/MinusOperatorTest.java | 121 ++++++++++++++++
.../query/runtime/operator/UnionOperatorTest.java | 91 ++++++++++++
.../src/test/resources/queries/SetOps.json | 51 +++++++
15 files changed, 915 insertions(+), 46 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index b713b9c82c..d728bb758d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -122,6 +122,7 @@ public class PinotQueryRuleSets {
PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.INSTANCE,
- PinotWindowExchangeNodeInsertRule.INSTANCE
+ PinotWindowExchangeNodeInsertRule.INSTANCE,
+ PinotSetOpExchangeNodeInsertRule.INSTANCE
);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..16227a1f83
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to always insert exchange after SetOp node.
+ */
+public class PinotSetOpExchangeNodeInsertRule extends RelOptRule {
+ public static final PinotSetOpExchangeNodeInsertRule INSTANCE =
+ new PinotSetOpExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+ public PinotSetOpExchangeNodeInsertRule(RelBuilderFactory factory) {
+ super(operand(SetOp.class, any()), factory, null);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ if (call.rels.length < 1) {
+ return false;
+ }
+ if (call.rel(0) instanceof SetOp) {
+ SetOp setOp = call.rel(0);
+ for (RelNode input : setOp.getInputs()) {
+ if (PinotRuleUtils.isExchange(input)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ SetOp setOp = call.rel(0);
+ List<RelNode> newInputs = new ArrayList<>();
+ List<Integer> hashFields =
+ IntStream.range(0, setOp.getRowType().getFieldCount()).boxed().collect(Collectors.toCollection(ArrayList::new));
+ for (RelNode input : setOp.getInputs()) {
+ RelNode exchange = LogicalExchange.create(input, RelDistributions.hash(hashFields));
+ newInputs.add(exchange);
+ }
+ SetOp newSetOpNode;
+ if (setOp instanceof LogicalUnion) {
+ newSetOpNode = new LogicalUnion(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all);
+ } else if (setOp instanceof LogicalIntersect) {
+ newSetOpNode = new LogicalIntersect(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all);
+ } else if (setOp instanceof LogicalMinus) {
+ newSetOpNode = new LogicalMinus(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all);
+ } else {
+ throw new UnsupportedOperationException("Unsupported set op node: " + setOp);
+ }
+ call.transformTo(newSetOpNode);
+ }
+}
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index 22bb56a78d..c131fecb49 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -7,10 +7,12 @@
"output": [
"Execution Plan",
"\nLogicalUnion(all=[true])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[a]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[b]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[b]])",
"\n"
]
},
@@ -20,13 +22,17 @@
"output": [
"Execution Plan",
"\nLogicalUnion(all=[true])",
- "\n LogicalUnion(all=[true])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalUnion(all=[true])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[b]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[b]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[c]])",
+ "\n LogicalTableScan(table=[[c]])",
"\n"
]
},
@@ -39,13 +45,17 @@
"\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 1}])",
"\n LogicalUnion(all=[true])",
- "\n LogicalUnion(all=[true])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalUnion(all=[true])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[b]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[b]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[c]])",
+ "\n LogicalTableScan(table=[[c]])",
"\n"
]
},
@@ -55,13 +65,17 @@
"output": [
"Execution Plan",
"\nLogicalIntersect(all=[false])",
- "\n LogicalIntersect(all=[false])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalIntersect(all=[false])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[b]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[b]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[c]])",
+ "\n LogicalTableScan(table=[[c]])",
"\n"
]
},
@@ -71,13 +85,17 @@
"output": [
"Execution Plan",
"\nLogicalMinus(all=[false])",
- "\n LogicalMinus(all=[false])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalMinus(all=[false])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[a]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col1=[$0], col2=[$1])",
+ "\n LogicalTableScan(table=[[b]])",
+ "\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[b]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalTableScan(table=[[c]])",
+ "\n LogicalTableScan(table=[[c]])",
"\n"
]
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/IntersectOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/IntersectOperator.java
new file mode 100644
index 0000000000..b0ba3c6fa3
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/IntersectOperator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ * Intersect operator.
+ */
+public class IntersectOperator extends SetOperator {
+ private static final String EXPLAIN_NAME = "INTERSECT";
+
+ public IntersectOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+ DataSchema dataSchema) {
+ super(opChainExecutionContext, upstreamOperators, dataSchema);
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected boolean handleRowMatched(Object[] row) {
+ return _rightRowSet.remove(new Record(row));
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MinusOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MinusOperator.java
new file mode 100644
index 0000000000..3415bfb3fc
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MinusOperator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ * Minus/Except operator.
+ */
+public class MinusOperator extends SetOperator {
+ private static final String EXPLAIN_NAME = "MINUS";
+
+ public MinusOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+ DataSchema dataSchema) {
+ super(opChainExecutionContext, upstreamOperators, dataSchema);
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected boolean handleRowMatched(Object[] row) {
+ return _rightRowSet.add(new Record(row));
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
new file mode 100644
index 0000000000..40667abc3e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ExplainPlanRows;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * Set operator, which supports UNION, INTERSECT and EXCEPT.
+ * This has two child operators, and the left child operator is the one that is used to construct the result.
+ * The right child operator is used to construct a set of rows that are used to filter the left child operator.
+ * The right child operator is consumed in a blocking manner, and the left child operator is consumed in a non-blocking
+ * UnionOperator: The right child operator is consumed in a blocking manner.
+ */
+public abstract class SetOperator extends MultiStageOperator {
+ protected final Set<Record> _rightRowSet;
+
+ private final List<MultiStageOperator> _upstreamOperators;
+ private final MultiStageOperator _leftChildOperator;
+ private final MultiStageOperator _rightChildOperator;
+
+ private final List<Object[]> _resultRowBlock;
+
+ private final DataSchema _dataSchema;
+ private TransferableBlock _currentLeftBlock;
+ private Iterator<Object[]> _currentLeftIterator;
+
+ private boolean _isRightBlockConsumed;
+
+ public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+ DataSchema dataSchema) {
+ super(opChainExecutionContext);
+ _dataSchema = dataSchema;
+ _upstreamOperators = upstreamOperators;
+ _leftChildOperator = getChildOperators().get(0);
+ _rightChildOperator = getChildOperators().get(1);
+ _rightRowSet = new HashSet<>();
+ _resultRowBlock = new ArrayList<>();
+ }
+
+ @Override
+ public List<MultiStageOperator> getChildOperators() {
+ return _upstreamOperators;
+ }
+
+ @Override
+ public void prepareForExplainPlan(ExplainPlanRows explainPlanRows) {
+ super.prepareForExplainPlan(explainPlanRows);
+ }
+
+ @Override
+ public void explainPlan(ExplainPlanRows explainPlanRows, int[] globalId, int parentId) {
+ super.explainPlan(explainPlanRows, globalId, parentId);
+ }
+
+ @Override
+ public IndexSegment getIndexSegment() {
+ return super.getIndexSegment();
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return super.getExecutionStatistics();
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ // A blocking call to construct a set with all the right side rows.
+ if (!_isRightBlockConsumed) {
+ constructRightBlockSet();
+ }
+ return constructResultBlockSet();
+ }
+
+ protected void constructRightBlockSet() {
+ TransferableBlock block = _rightChildOperator.nextBlock();
+ while (!block.isEndOfStreamBlock()) {
+ if (block.getType() != DataBlock.Type.METADATA) {
+ for (Object[] row : block.getContainer()) {
+ _rightRowSet.add(new Record(row));
+ }
+ }
+ block = _rightChildOperator.nextBlock();
+ }
+ _isRightBlockConsumed = true;
+ }
+
+ protected TransferableBlock constructResultBlockSet() {
+ _resultRowBlock.clear();
+ // First time initialization.
+ if (_currentLeftBlock == null) {
+ _currentLeftBlock = _leftChildOperator.nextBlock();
+ }
+ while (!_currentLeftBlock.isEndOfStreamBlock()) {
+ if (_currentLeftBlock.getType() == DataBlock.Type.METADATA) {
+ _currentLeftBlock = _leftChildOperator.nextBlock();
+ continue;
+ }
+ _currentLeftIterator = _currentLeftBlock.getContainer().iterator();
+ while (_currentLeftIterator.hasNext()) {
+ Object[] row = _currentLeftIterator.next();
+ if (handleRowMatched(row)) {
+ _resultRowBlock.add(row);
+ if (_resultRowBlock.size() == SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY) {
+ return new TransferableBlock(_resultRowBlock, _dataSchema, DataBlock.Type.ROW);
+ }
+ }
+ }
+ _currentLeftBlock = _leftChildOperator.nextBlock();
+ }
+ if (!_resultRowBlock.isEmpty()) {
+ return new TransferableBlock(_resultRowBlock, _dataSchema, DataBlock.Type.ROW);
+ }
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+
+ /**
+ * Returns true if the row is matched.
+ * Also updates the right row set based on the Operator.
+ * @param row
+ * @return true if the row is matched.
+ */
+ protected abstract boolean handleRowMatched(Object[] row);
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
new file mode 100644
index 0000000000..4f69b575b1
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ * Union operator for UNION ALL queries.
+ */
+public class UnionOperator extends SetOperator {
+ private static final String EXPLAIN_NAME = "UNION";
+
+ public UnionOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+ DataSchema dataSchema) {
+ super(opChainExecutionContext, upstreamOperators, dataSchema);
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ for (MultiStageOperator upstreamOperator : getChildOperators()) {
+ TransferableBlock block = upstreamOperator.nextBlock();
+ if (!block.isEndOfStreamBlock()) {
+ return block;
+ }
+ }
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+
+ @Override
+ protected boolean handleRowMatched(Object[] row) {
+ throw new UnsupportedOperationException("Union operator does not support row matching");
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
index adc00d9c78..1d2c42a193 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -59,6 +59,23 @@ public class SortUtils {
}
}
+ /**
+ * Sort comparator for use with priority queues
+ * @param dataSchema data schema to use
+ */
+ public SortComparator(DataSchema dataSchema) {
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ _size = columnDataTypes.length;
+ _valueIndices = new int[_size];
+ _multipliers = new int[_size];
+ _useDoubleComparison = new boolean[_size];
+ for (int i = 0; i < _size; i++) {
+ _valueIndices[i] = i;
+ _multipliers[i] = 1;
+ _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
+ }
+ }
+
@Override
public int compare(Object[] o1, Object[] o2) {
for (int i = 0; i < _size; i++) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index ecf5c9672f..4ff613e3ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.query.runtime.plan;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
@@ -34,14 +36,17 @@ import org.apache.pinot.query.planner.stage.WindowNode;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.IntersectOperator;
import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.MinusOperator;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.operator.SortOperator;
import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.operator.UnionOperator;
import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
@@ -105,8 +110,24 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) {
- throw new UnsupportedOperationException(
- "Stage node of type SetOpNode: " + setOpNode.getSetOpType() + " is not supported!");
+ List<MultiStageOperator> inputs = new ArrayList<>();
+ for (StageNode input : setOpNode.getInputs()) {
+ MultiStageOperator visited = input.visit(this, context);
+ inputs.add(visited);
+ }
+ switch (setOpNode.getSetOpType()) {
+ case UNION:
+ return new UnionOperator(context.getOpChainExecutionContext(), inputs,
+ setOpNode.getInputs().get(0).getDataSchema());
+ case INTERSECT:
+ return new IntersectOperator(context.getOpChainExecutionContext(), inputs,
+ setOpNode.getInputs().get(0).getDataSchema());
+ case MINUS:
+ return new MinusOperator(context.getOpChainExecutionContext(), inputs,
+ setOpNode.getInputs().get(0).getDataSchema());
+ default:
+ throw new IllegalStateException();
+ }
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index ef2984e4ff..c2c01388d7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -184,8 +184,9 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
}
@Override
- public Void visitSetOp(SetOpNode setOpNode, ServerPlanRequestContext context) {
- throw new UnsupportedOperationException("SetOp not yet supported!");
+ public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return null;
}
@Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 5e45b765c8..22d26fe4fb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -234,31 +234,60 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
return ((Timestamp) l).compareTo((Timestamp) r);
} else if (l instanceof int[]) {
int[] larray = (int[]) l;
- Object[] rarray;
try {
- rarray = (Object[]) ((JdbcArray) r).getArray();
+ if (r instanceof JdbcArray) {
+ Object[] rarray = (Object[]) ((JdbcArray) r).getArray();
+ for (int idx = 0; idx < larray.length; idx++) {
+ Number relement = (Number) rarray[idx];
+ if (larray[idx] != relement.intValue()) {
+ return -1;
+ }
+ }
+ } else {
+ int[] rarray = (int[]) r;
+ for (int idx = 0; idx < larray.length; idx++) {
+ if (larray[idx] != rarray[idx]) {
+ return -1;
+ }
+ }
+ }
} catch (SQLException e) {
throw new RuntimeException(e);
}
- for (int idx = 0; idx < larray.length; idx++) {
- Number relement = (Number) rarray[idx];
- if (larray[idx] != relement.intValue()) {
- return -1;
- }
- }
return 0;
} else if (l instanceof String[]) {
String[] larray = (String[]) l;
- Object[] rarray;
try {
- rarray = (Object[]) ((JdbcArray) r).getArray();
+ if (r instanceof JdbcArray) {
+ Object[] rarray = (Object[]) ((JdbcArray) r).getArray();
+ for (int idx = 0; idx < larray.length; idx++) {
+ if (!larray[idx].equals(rarray[idx])) {
+ return -1;
+ }
+ }
+ } else {
+ String[] rarray = (String[]) r;
+ for (int idx = 0; idx < larray.length; idx++) {
+ if (!larray[idx].equals(rarray[idx])) {
+ return -1;
+ }
+ }
+ }
} catch (SQLException e) {
throw new RuntimeException(e);
}
- for (int idx = 0; idx < larray.length; idx++) {
- if (!larray[idx].equals(rarray[idx])) {
- return -1;
+ return 0;
+ } else if (l instanceof JdbcArray) {
+ try {
+ Object[] larray = (Object[]) ((JdbcArray) l).getArray();
+ Object[] rarray = (Object[]) ((JdbcArray) r).getArray();
+ for (int idx = 0; idx < larray.length; idx++) {
+ if (!larray[idx].equals(rarray[idx])) {
+ return -1;
+ }
}
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
}
return 0;
} else {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
new file mode 100644
index 0000000000..04c14caea1
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class IntersectOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private MultiStageOperator _leftOperator;
+
+ @Mock
+ private MultiStageOperator _rightOperator;
+
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testIntersectOperator() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{4, "DD"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ IntersectOperator intersectOperator =
+ new IntersectOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+
+ TransferableBlock result = intersectOperator.nextBlock();
+ while (result.getType() != DataBlock.Type.ROW) {
+ result = intersectOperator.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ for (int i = 0; i < resultRows.size(); i++) {
+ Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+ }
+ }
+
+ @Test
+ public void testDedup() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+ new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{4, "DD"},
+ new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{4, "DD"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ IntersectOperator intersectOperator =
+ new IntersectOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+
+ TransferableBlock result = intersectOperator.nextBlock();
+ while (result.getType() != DataBlock.Type.ROW) {
+ result = intersectOperator.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ for (int i = 0; i < resultRows.size(); i++) {
+ Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
new file mode 100644
index 0000000000..53e81612ad
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class MinusOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private MultiStageOperator _leftOperator;
+
+ @Mock
+ private MultiStageOperator _rightOperator;
+
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testExceptOperator() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+ new Object[]{4, "DD"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, "EE"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ MinusOperator minusOperator =
+ new MinusOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+
+ TransferableBlock result = minusOperator.nextBlock();
+ while (result.getType() != DataBlock.Type.ROW) {
+ result = minusOperator.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new Object[]{4, "DD"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ for (int i = 0; i < resultRows.size(); i++) {
+ Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+ }
+ }
+
+ @Test
+ public void testDedup() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+ new Object[]{4, "DD"}, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+ new Object[]{4, "DD"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, "EE"},
+ new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, "EE"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ MinusOperator minusOperator =
+ new MinusOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+
+ TransferableBlock result = minusOperator.nextBlock();
+ while (result.getType() != DataBlock.Type.ROW) {
+ result = minusOperator.nextBlock();
+ }
+ List<Object[]> resultRows = result.getContainer();
+ List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new Object[]{4, "DD"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ for (int i = 0; i < resultRows.size(); i++) {
+ Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
new file mode 100644
index 0000000000..16e8c46874
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class UnionOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private MultiStageOperator _leftOperator;
+
+ @Mock
+ private MultiStageOperator _rightOperator;
+
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testUnionOperator() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_rightOperator.nextBlock()).thenReturn(
+ OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new Object[]{4, "bb"}, new Object[]{5, "cc"}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ UnionOperator unionOperator =
+ new UnionOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ List<Object[]> resultRows = new ArrayList<>();
+ TransferableBlock result = unionOperator.nextBlock();
+ while (!result.isEndOfStreamBlock()) {
+ resultRows.addAll(result.getContainer());
+ result = unionOperator.nextBlock();
+ }
+ List<Object[]> expectedRows =
+ Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "aa"}, new Object[]{4, "bb"},
+ new Object[]{5, "cc"});
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+ for (int i = 0; i < resultRows.size(); i++) {
+ Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOps.json b/pinot-query-runtime/src/test/resources/queries/SetOps.json
new file mode 100644
index 0000000000..4ae8a800e6
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/SetOps.json
@@ -0,0 +1,51 @@
+{
+ "set_op_test": {
+ "tables": {
+ "tbl1": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "strCol", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 8, 3.0, 5.176518e16, "lyons"],
+ [2, 9, 4.0, 4.608155e11, "onan"],
+ [3, 14, 5.0, 1.249261e11, "rudvalis"],
+ [4, 21, 6.0, 8.677557e19, "janko"],
+ [1, 41, 2.0, 4.15478e33, "baby"],
+ [2, 46, 1.0, 8.08017e53, "monster"]
+ ]
+ },
+ "tbl2": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "strCol", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "foo"],
+ [2, "bar"]
+ ]
+ },
+ "tbl3": {
+ "schema":[
+ {"name": "intArrayCol", "type": "INT", "isSingleValue": false},
+ {"name": "strArrayCol", "type": "STRING", "isSingleValue": false}
+ ],
+ "inputs": [
+ [[1, 10], ["foo1", "foo2"]]
+ ]
+ }
+ },
+ "queries": [
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 MINUS SELECT intCol FROM {tbl1} WHERE floatCol <2.5 "},
+ { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 EXCEPT SELECT intCol FROM {tbl1} WHERE floatCol <2.5 "},
+ { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1}"},
+ { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE strCol = 'baby' "},
+ { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
+ { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT intArrayCol, strArrayCol FROM {tbl3}"}
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org