You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/05/03 20:23:59 UTC

[pinot] branch master updated: [multistage] UNION/INTERSECT/EXCEPT implementation (#10622)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c4d0fe0a3 [multistage] UNION/INTERSECT/EXCEPT implementation  (#10622)
1c4d0fe0a3 is described below

commit 1c4d0fe0a3d41a9385d6de04829c282a484aaf55
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Wed May 3 13:23:52 2023 -0700

    [multistage] UNION/INTERSECT/EXCEPT implementation  (#10622)
---
 .../calcite/rel/rules/PinotQueryRuleSets.java      |   3 +-
 .../rules/PinotSetOpExchangeNodeInsertRule.java    |  87 ++++++++++++
 .../src/test/resources/queries/SetOpPlans.json     |  74 ++++++----
 .../query/runtime/operator/IntersectOperator.java  |  49 +++++++
 .../query/runtime/operator/MinusOperator.java      |  49 +++++++
 .../pinot/query/runtime/operator/SetOperator.java  | 154 +++++++++++++++++++++
 .../query/runtime/operator/UnionOperator.java      |  61 ++++++++
 .../query/runtime/operator/utils/SortUtils.java    |  17 +++
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  25 +++-
 .../runtime/plan/ServerRequestPlanVisitor.java     |   5 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  55 ++++++--
 .../runtime/operator/IntersectOperatorTest.java    | 119 ++++++++++++++++
 .../query/runtime/operator/MinusOperatorTest.java  | 121 ++++++++++++++++
 .../query/runtime/operator/UnionOperatorTest.java  |  91 ++++++++++++
 .../src/test/resources/queries/SetOps.json         |  51 +++++++
 15 files changed, 915 insertions(+), 46 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index b713b9c82c..d728bb758d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -122,6 +122,7 @@ public class PinotQueryRuleSets {
 
       PinotJoinExchangeNodeInsertRule.INSTANCE,
       PinotAggregateExchangeNodeInsertRule.INSTANCE,
-      PinotWindowExchangeNodeInsertRule.INSTANCE
+      PinotWindowExchangeNodeInsertRule.INSTANCE,
+      PinotSetOpExchangeNodeInsertRule.INSTANCE
   );
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..16227a1f83
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to always insert exchange after SetOp node.
+ */
+public class PinotSetOpExchangeNodeInsertRule extends RelOptRule {
+  public static final PinotSetOpExchangeNodeInsertRule INSTANCE =
+      new PinotSetOpExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+  public PinotSetOpExchangeNodeInsertRule(RelBuilderFactory factory) {
+    super(operand(SetOp.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+    if (call.rel(0) instanceof SetOp) {
+      SetOp setOp = call.rel(0);
+      for (RelNode input : setOp.getInputs()) {
+        if (PinotRuleUtils.isExchange(input)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    SetOp setOp = call.rel(0);
+    List<RelNode> newInputs = new ArrayList<>();
+    List<Integer> hashFields =
+        IntStream.range(0, setOp.getRowType().getFieldCount()).boxed().collect(Collectors.toCollection(ArrayList::new));
+    for (RelNode input : setOp.getInputs()) {
+      RelNode exchange = LogicalExchange.create(input, RelDistributions.hash(hashFields));
+      newInputs.add(exchange);
+    }
+    SetOp newSetOpNode;
+    if (setOp instanceof LogicalUnion) {
+      newSetOpNode = new LogicalUnion(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all);
+    } else if (setOp instanceof LogicalIntersect) {
+      newSetOpNode = new LogicalIntersect(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all);
+    } else if (setOp instanceof LogicalMinus) {
+      newSetOpNode = new LogicalMinus(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all);
+    } else {
+      throw new UnsupportedOperationException("Unsupported set op node: " + setOp);
+    }
+    call.transformTo(newSetOpNode);
+  }
+}
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index 22bb56a78d..c131fecb49 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -7,10 +7,12 @@
         "output": [
           "Execution Plan",
           "\nLogicalUnion(all=[true])",
-          "\n  LogicalProject(col1=[$0], col2=[$1])",
-          "\n    LogicalTableScan(table=[[a]])",
-          "\n  LogicalProject(col1=[$0], col2=[$1])",
-          "\n    LogicalTableScan(table=[[b]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalProject(col1=[$0], col2=[$1])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalProject(col1=[$0], col2=[$1])",
+          "\n      LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -20,13 +22,17 @@
         "output": [
           "Execution Plan",
           "\nLogicalUnion(all=[true])",
-          "\n  LogicalUnion(all=[true])",
-          "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalUnion(all=[true])",
+          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        LogicalProject(col1=[$0], col2=[$1])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        LogicalProject(col1=[$0], col2=[$1])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[b]])",
-          "\n  LogicalProject(col1=[$0], col2=[$1])",
-          "\n    LogicalTableScan(table=[[c]])",
+          "\n      LogicalTableScan(table=[[c]])",
           "\n"
         ]
       },
@@ -39,13 +45,17 @@
           "\n  LogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalAggregate(group=[{0, 1}])",
           "\n      LogicalUnion(all=[true])",
-          "\n        LogicalUnion(all=[true])",
-          "\n          LogicalProject(col1=[$0], col2=[$1])",
-          "\n            LogicalTableScan(table=[[a]])",
+          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalUnion(all=[true])",
+          "\n            LogicalExchange(distribution=[hash[0, 1]])",
+          "\n              LogicalProject(col1=[$0], col2=[$1])",
+          "\n                LogicalTableScan(table=[[a]])",
+          "\n            LogicalExchange(distribution=[hash[0, 1]])",
+          "\n              LogicalProject(col1=[$0], col2=[$1])",
+          "\n                LogicalTableScan(table=[[b]])",
+          "\n        LogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalProject(col1=[$0], col2=[$1])",
-          "\n            LogicalTableScan(table=[[b]])",
-          "\n        LogicalProject(col1=[$0], col2=[$1])",
-          "\n          LogicalTableScan(table=[[c]])",
+          "\n            LogicalTableScan(table=[[c]])",
           "\n"
         ]
       },
@@ -55,13 +65,17 @@
         "output": [
           "Execution Plan",
           "\nLogicalIntersect(all=[false])",
-          "\n  LogicalIntersect(all=[false])",
-          "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalIntersect(all=[false])",
+          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        LogicalProject(col1=[$0], col2=[$1])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        LogicalProject(col1=[$0], col2=[$1])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[b]])",
-          "\n  LogicalProject(col1=[$0], col2=[$1])",
-          "\n    LogicalTableScan(table=[[c]])",
+          "\n      LogicalTableScan(table=[[c]])",
           "\n"
         ]
       },
@@ -71,13 +85,17 @@
         "output": [
           "Execution Plan",
           "\nLogicalMinus(all=[false])",
-          "\n  LogicalMinus(all=[false])",
-          "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalMinus(all=[false])",
+          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        LogicalProject(col1=[$0], col2=[$1])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        LogicalProject(col1=[$0], col2=[$1])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[b]])",
-          "\n  LogicalProject(col1=[$0], col2=[$1])",
-          "\n    LogicalTableScan(table=[[c]])",
+          "\n      LogicalTableScan(table=[[c]])",
           "\n"
         ]
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/IntersectOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/IntersectOperator.java
new file mode 100644
index 0000000000..b0ba3c6fa3
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/IntersectOperator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ * Intersect operator.
+ */
+public class IntersectOperator extends SetOperator {
+  private static final String EXPLAIN_NAME = "INTERSECT";
+
+  public IntersectOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+      DataSchema dataSchema) {
+    super(opChainExecutionContext, upstreamOperators, dataSchema);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected boolean handleRowMatched(Object[] row) {
+    return _rightRowSet.remove(new Record(row));
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MinusOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MinusOperator.java
new file mode 100644
index 0000000000..3415bfb3fc
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MinusOperator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ * Minus/Except operator.
+ */
+public class MinusOperator extends SetOperator {
+  private static final String EXPLAIN_NAME = "MINUS";
+
+  public MinusOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+      DataSchema dataSchema) {
+    super(opChainExecutionContext, upstreamOperators, dataSchema);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected boolean handleRowMatched(Object[] row) {
+    return _rightRowSet.add(new Record(row));
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
new file mode 100644
index 0000000000..40667abc3e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ExplainPlanRows;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * Set operator, which supports UNION, INTERSECT and EXCEPT.
+ * This has two child operators, and the left child operator is the one that is used to construct the result.
+ * The right child operator is used to construct a set of rows that are used to filter the left child operator.
+ * The right child operator is consumed in a blocking manner, and the left child operator is consumed in a non-blocking
+ * UnionOperator: The right child operator is consumed in a blocking manner.
+ */
+public abstract class SetOperator extends MultiStageOperator {
+  protected final Set<Record> _rightRowSet;
+
+  private final List<MultiStageOperator> _upstreamOperators;
+  private final MultiStageOperator _leftChildOperator;
+  private final MultiStageOperator _rightChildOperator;
+
+  private final List<Object[]> _resultRowBlock;
+
+  private final DataSchema _dataSchema;
+  private TransferableBlock _currentLeftBlock;
+  private Iterator<Object[]> _currentLeftIterator;
+
+  private boolean _isRightBlockConsumed;
+
+  public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+      DataSchema dataSchema) {
+    super(opChainExecutionContext);
+    _dataSchema = dataSchema;
+    _upstreamOperators = upstreamOperators;
+    _leftChildOperator = getChildOperators().get(0);
+    _rightChildOperator = getChildOperators().get(1);
+    _rightRowSet = new HashSet<>();
+    _resultRowBlock = new ArrayList<>();
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return _upstreamOperators;
+  }
+
+  @Override
+  public void prepareForExplainPlan(ExplainPlanRows explainPlanRows) {
+    super.prepareForExplainPlan(explainPlanRows);
+  }
+
+  @Override
+  public void explainPlan(ExplainPlanRows explainPlanRows, int[] globalId, int parentId) {
+    super.explainPlan(explainPlanRows, globalId, parentId);
+  }
+
+  @Override
+  public IndexSegment getIndexSegment() {
+    return super.getIndexSegment();
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    return super.getExecutionStatistics();
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    // A blocking call to construct a set with all the right side rows.
+    if (!_isRightBlockConsumed) {
+      constructRightBlockSet();
+    }
+    return constructResultBlockSet();
+  }
+
+  protected void constructRightBlockSet() {
+    TransferableBlock block = _rightChildOperator.nextBlock();
+    while (!block.isEndOfStreamBlock()) {
+      if (block.getType() != DataBlock.Type.METADATA) {
+        for (Object[] row : block.getContainer()) {
+          _rightRowSet.add(new Record(row));
+        }
+      }
+      block = _rightChildOperator.nextBlock();
+    }
+    _isRightBlockConsumed = true;
+  }
+
+  protected TransferableBlock constructResultBlockSet() {
+    _resultRowBlock.clear();
+    // First time initialization.
+    if (_currentLeftBlock == null) {
+      _currentLeftBlock = _leftChildOperator.nextBlock();
+    }
+    while (!_currentLeftBlock.isEndOfStreamBlock()) {
+      if (_currentLeftBlock.getType() == DataBlock.Type.METADATA) {
+        _currentLeftBlock = _leftChildOperator.nextBlock();
+        continue;
+      }
+      _currentLeftIterator = _currentLeftBlock.getContainer().iterator();
+      while (_currentLeftIterator.hasNext()) {
+        Object[] row = _currentLeftIterator.next();
+        if (handleRowMatched(row)) {
+          _resultRowBlock.add(row);
+          if (_resultRowBlock.size() == SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY) {
+            return new TransferableBlock(_resultRowBlock, _dataSchema, DataBlock.Type.ROW);
+          }
+        }
+      }
+      _currentLeftBlock = _leftChildOperator.nextBlock();
+    }
+    if (!_resultRowBlock.isEmpty()) {
+      return new TransferableBlock(_resultRowBlock, _dataSchema, DataBlock.Type.ROW);
+    }
+    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+  }
+
+  /**
+   * Returns true if the row is matched.
+   * Also updates the right row set based on the Operator.
+   * @param row
+   * @return true if the row is matched.
+   */
+  protected abstract boolean handleRowMatched(Object[] row);
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
new file mode 100644
index 0000000000..4f69b575b1
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+/**
+ * Union operator for UNION ALL queries.
+ */
+public class UnionOperator extends SetOperator {
+  private static final String EXPLAIN_NAME = "UNION";
+
+  public UnionOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
+      DataSchema dataSchema) {
+    super(opChainExecutionContext, upstreamOperators, dataSchema);
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    for (MultiStageOperator upstreamOperator : getChildOperators()) {
+      TransferableBlock block = upstreamOperator.nextBlock();
+      if (!block.isEndOfStreamBlock()) {
+        return block;
+      }
+    }
+    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+  }
+
+  @Override
+  protected boolean handleRowMatched(Object[] row) {
+    throw new UnsupportedOperationException("Union operator does not support row matching");
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
index adc00d9c78..1d2c42a193 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -59,6 +59,23 @@ public class SortUtils {
       }
     }
 
+    /**
+     * Sort comparator for use with priority queues
+     * @param dataSchema data schema to use
+     */
+    public SortComparator(DataSchema dataSchema) {
+      DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+      _size = columnDataTypes.length;
+      _valueIndices = new int[_size];
+      _multipliers = new int[_size];
+      _useDoubleComparison = new boolean[_size];
+      for (int i = 0; i < _size; i++) {
+        _valueIndices[i] = i;
+        _multipliers[i] = 1;
+        _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
+      }
+    }
+
     @Override
     public int compare(Object[] o1, Object[] o2) {
       for (int i = 0; i < _size; i++) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index ecf5c9672f..4ff613e3ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
@@ -34,14 +36,17 @@ import org.apache.pinot.query.planner.stage.WindowNode;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.IntersectOperator;
 import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.MinusOperator;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.SortOperator;
 import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.operator.UnionOperator;
 import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
 
 
@@ -105,8 +110,24 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) {
-    throw new UnsupportedOperationException(
-        "Stage node of type SetOpNode: " + setOpNode.getSetOpType() + " is not supported!");
+    List<MultiStageOperator> inputs = new ArrayList<>();
+    for (StageNode input : setOpNode.getInputs()) {
+      MultiStageOperator visited = input.visit(this, context);
+      inputs.add(visited);
+    }
+    switch (setOpNode.getSetOpType()) {
+      case UNION:
+        return new UnionOperator(context.getOpChainExecutionContext(), inputs,
+            setOpNode.getInputs().get(0).getDataSchema());
+      case INTERSECT:
+        return new IntersectOperator(context.getOpChainExecutionContext(), inputs,
+            setOpNode.getInputs().get(0).getDataSchema());
+      case MINUS:
+        return new MinusOperator(context.getOpChainExecutionContext(), inputs,
+            setOpNode.getInputs().get(0).getDataSchema());
+      default:
+        throw new IllegalStateException();
+    }
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index ef2984e4ff..c2c01388d7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -184,8 +184,9 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
   }
 
   @Override
-  public Void visitSetOp(SetOpNode setOpNode, ServerPlanRequestContext context) {
-    throw new UnsupportedOperationException("SetOp not yet supported!");
+  public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return null;
   }
 
   @Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 5e45b765c8..22d26fe4fb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -234,31 +234,60 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
         return ((Timestamp) l).compareTo((Timestamp) r);
       } else if (l instanceof int[]) {
         int[] larray = (int[]) l;
-        Object[] rarray;
         try {
-          rarray = (Object[]) ((JdbcArray) r).getArray();
+          if (r instanceof JdbcArray) {
+            Object[] rarray = (Object[]) ((JdbcArray) r).getArray();
+            for (int idx = 0; idx < larray.length; idx++) {
+              Number relement = (Number) rarray[idx];
+              if (larray[idx] != relement.intValue()) {
+                return -1;
+              }
+            }
+          } else {
+            int[] rarray = (int[]) r;
+            for (int idx = 0; idx < larray.length; idx++) {
+              if (larray[idx] != rarray[idx]) {
+                return -1;
+              }
+            }
+          }
         } catch (SQLException e) {
           throw new RuntimeException(e);
         }
-        for (int idx = 0; idx < larray.length; idx++) {
-          Number relement = (Number) rarray[idx];
-          if (larray[idx] != relement.intValue()) {
-            return -1;
-          }
-        }
         return 0;
       } else if (l instanceof String[]) {
         String[] larray = (String[]) l;
-        Object[] rarray;
         try {
-          rarray = (Object[]) ((JdbcArray) r).getArray();
+          if (r instanceof JdbcArray) {
+            Object[] rarray = (Object[]) ((JdbcArray) r).getArray();
+            for (int idx = 0; idx < larray.length; idx++) {
+              if (!larray[idx].equals(rarray[idx])) {
+                return -1;
+              }
+            }
+          } else {
+            String[] rarray = (String[]) r;
+            for (int idx = 0; idx < larray.length; idx++) {
+              if (!larray[idx].equals(rarray[idx])) {
+                return -1;
+              }
+            }
+          }
         } catch (SQLException e) {
           throw new RuntimeException(e);
         }
-        for (int idx = 0; idx < larray.length; idx++) {
-          if (!larray[idx].equals(rarray[idx])) {
-            return -1;
+        return 0;
+      } else if (l instanceof JdbcArray) {
+        try {
+          Object[] larray = (Object[]) ((JdbcArray) l).getArray();
+          Object[] rarray = (Object[]) ((JdbcArray) r).getArray();
+          for (int idx = 0; idx < larray.length; idx++) {
+            if (!larray[idx].equals(rarray[idx])) {
+              return -1;
+            }
           }
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
         }
         return 0;
       } else {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
new file mode 100644
index 0000000000..04c14caea1
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class IntersectOperatorTest {
+  private AutoCloseable _mocks;
+
+  @Mock
+  private MultiStageOperator _leftOperator;
+
+  @Mock
+  private MultiStageOperator _rightOperator;
+
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testIntersectOperator() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{4, "DD"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    IntersectOperator intersectOperator =
+        new IntersectOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+
+    TransferableBlock result = intersectOperator.nextBlock();
+    while (result.getType() != DataBlock.Type.ROW) {
+      result = intersectOperator.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    for (int i = 0; i < resultRows.size(); i++) {
+      Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+    }
+  }
+
+  @Test
+  public void testDedup() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+            new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{4, "DD"},
+                new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{4, "DD"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    IntersectOperator intersectOperator =
+        new IntersectOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+
+    TransferableBlock result = intersectOperator.nextBlock();
+    while (result.getType() != DataBlock.Type.ROW) {
+      result = intersectOperator.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    for (int i = 0; i < resultRows.size(); i++) {
+      Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
new file mode 100644
index 0000000000..53e81612ad
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class MinusOperatorTest {
+  private AutoCloseable _mocks;
+
+  @Mock
+  private MultiStageOperator _leftOperator;
+
+  @Mock
+  private MultiStageOperator _rightOperator;
+
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testExceptOperator() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+            new Object[]{4, "DD"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, "EE"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    MinusOperator minusOperator =
+        new MinusOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+
+    TransferableBlock result = minusOperator.nextBlock();
+    while (result.getType() != DataBlock.Type.ROW) {
+      result = minusOperator.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new Object[]{4, "DD"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    for (int i = 0; i < resultRows.size(); i++) {
+      Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+    }
+  }
+
+  @Test
+  public void testDedup() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+            new Object[]{4, "DD"}, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "CC"},
+            new Object[]{4, "DD"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, "EE"},
+                new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, "EE"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    MinusOperator minusOperator =
+        new MinusOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+
+    TransferableBlock result = minusOperator.nextBlock();
+    while (result.getType() != DataBlock.Type.ROW) {
+      result = minusOperator.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new Object[]{4, "DD"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    for (int i = 0; i < resultRows.size(); i++) {
+      Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
new file mode 100644
index 0000000000..16e8c46874
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class UnionOperatorTest {
+  private AutoCloseable _mocks;
+
+  @Mock
+  private MultiStageOperator _leftOperator;
+
+  @Mock
+  private MultiStageOperator _rightOperator;
+
+  @Mock
+  private VirtualServerAddress _serverAddress;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testUnionOperator() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new Object[]{4, "bb"}, new Object[]{5, "cc"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    UnionOperator unionOperator =
+        new UnionOperator(OperatorTestUtil.getDefaultContext(), ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    List<Object[]> resultRows = new ArrayList<>();
+    TransferableBlock result = unionOperator.nextBlock();
+    while (!result.isEndOfStreamBlock()) {
+      resultRows.addAll(result.getContainer());
+      result = unionOperator.nextBlock();
+    }
+    List<Object[]> expectedRows =
+        Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{3, "aa"}, new Object[]{4, "bb"},
+            new Object[]{5, "cc"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    for (int i = 0; i < resultRows.size(); i++) {
+      Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOps.json b/pinot-query-runtime/src/test/resources/queries/SetOps.json
new file mode 100644
index 0000000000..4ae8a800e6
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/SetOps.json
@@ -0,0 +1,51 @@
+{
+  "set_op_test": {
+    "tables": {
+      "tbl1": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "strCol", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, 8, 3.0, 5.176518e16, "lyons"],
+          [2, 9, 4.0, 4.608155e11, "onan"],
+          [3, 14, 5.0, 1.249261e11, "rudvalis"],
+          [4, 21, 6.0, 8.677557e19, "janko"],
+          [1, 41, 2.0, 4.15478e33, "baby"],
+          [2, 46, 1.0, 8.08017e53, "monster"]
+        ]
+      },
+      "tbl2": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "strCol", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "foo"],
+          [2, "bar"]
+        ]
+      },
+      "tbl3": {
+        "schema":[
+          {"name": "intArrayCol", "type": "INT", "isSingleValue": false},
+          {"name": "strArrayCol", "type": "STRING", "isSingleValue": false}
+        ],
+        "inputs": [
+          [[1, 10], ["foo1", "foo2"]]
+        ]
+      }
+    },
+    "queries": [
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 MINUS SELECT intCol FROM {tbl1} WHERE floatCol <2.5 "},
+      { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 EXCEPT SELECT intCol FROM {tbl1} WHERE floatCol <2.5 "},
+      { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1}"},
+      { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE strCol = 'baby' "},
+      { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
+      { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT intArrayCol, strArrayCol FROM {tbl3}"}
+    ]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org