You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/06/04 14:06:31 UTC

[ignite] branch sql-calcite updated: IGNITE-14638 Support for INTERSECT operator - Fixes #9095.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new de76397  IGNITE-14638 Support for INTERSECT operator - Fixes #9095.
de76397 is described below

commit de76397d33fda782512e0b4c034728877c455b8d
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Jun 4 17:04:55 2021 +0300

    IGNITE-14638 Support for INTERSECT operator - Fixes #9095.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/exec/ExecutionServiceImpl.java   |   1 +
 .../query/calcite/exec/LogicalRelImplementor.java  |  35 +-
 .../rel/{MinusNode.java => AbstractSetOpNode.java} | 135 +++----
 .../query/calcite/exec/rel/IntersectNode.java      | 118 ++++++
 .../query/calcite/exec/rel/MinusNode.java          | 330 ++--------------
 .../query/calcite/metadata/IgniteMdRowCount.java   |   6 +-
 .../processors/query/calcite/prepare/Cloner.java   |  16 +-
 .../query/calcite/prepare/IgniteRelShuttle.java    |  23 +-
 .../query/calcite/prepare/PlannerPhase.java        |   9 +-
 .../query/calcite/rel/IgniteRelVisitor.java        |  16 +-
 .../query/calcite/rel/IgniteTableFunctionScan.java |  12 +
 .../query/calcite/rel/set/IgniteIntersect.java     |  67 ++++
 .../IgniteMapIntersect.java}                       |  58 ++-
 .../query/calcite/rel/set/IgniteMapMinus.java      |  74 +---
 .../{IgniteMapMinus.java => IgniteMapSetOp.java}   |  86 ++---
 .../set/{IgniteMinusBase.java => IgniteMinus.java} |  46 +--
 ...ReduceMinus.java => IgniteReduceIntersect.java} |  64 +---
 .../query/calcite/rel/set/IgniteReduceMinus.java   |  50 +--
 .../query/calcite/rel/set/IgniteReduceSetOp.java   |  74 ++++
 .../set/{IgniteMinusBase.java => IgniteSetOp.java} |  55 +--
 .../calcite/rel/set/IgniteSingleIntersect.java     |  67 ++++
 .../query/calcite/rel/set/IgniteSingleMinus.java   |  71 +---
 ...niteSingleMinus.java => IgniteSingleSetOp.java} |  62 +--
 .../query/calcite/rule/MinusConverterRule.java     |  94 -----
 .../query/calcite/rule/SetOpConverterRule.java     | 185 +++++++++
 .../query/calcite/CalciteQueryProcessorTest.java   | 223 -----------
 ...onTest.java => AbstractSetOpExecutionTest.java} | 109 ++----
 .../calcite/exec/rel/IntersectExecutionTest.java   |  82 ++++
 .../query/calcite/exec/rel/MinusExecutionTest.java | 204 ++--------
 .../integration/AbstractBasicIntegrationTest.java  |   2 +-
 .../calcite/integration/SetOpIntegrationTest.java  | 425 +++++++++++++++++++++
 .../query/calcite/planner/AbstractPlannerTest.java |   2 +
 ...xceptPlannerTest.java => SetOpPlannerTest.java} | 265 ++++++++-----
 ...tionTest.java => TableFunctionPlannerTest.java} |   2 +-
 .../ignite/testsuites/ExecutionTestSuite.java      |   2 +
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 .../apache/ignite/testsuites/PlannerTestSuite.java |   8 +-
 37 files changed, 1511 insertions(+), 1569 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 330e8c2..c161e89 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -552,6 +552,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
             case VALUES:
             case UNION:
             case EXCEPT:
+            case INTERSECT:
                 return prepareQuery(sqlNode, ctx);
 
             case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index dc559f5..70480b9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -26,7 +26,9 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.Spool;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
@@ -37,11 +39,13 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFa
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractSetOpNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusNode;
@@ -85,10 +89,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
@@ -432,29 +433,21 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteSingleMinus rel) {
-        return visit(rel, AggregateType.SINGLE);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteMapMinus rel) {
-        return visit(rel, AggregateType.MAP);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Node<Row> visit(IgniteReduceMinus rel) {
-        return visit(rel, AggregateType.REDUCE);
-    }
-
-    /** Visit IgniteMinus rel. */
-    private Node<Row> visit(IgniteMinusBase rel, AggregateType aggType) {
+    @Override public Node<Row> visit(IgniteSetOp rel) {
         RelDataType rowType = rel.getRowType();
 
         RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
         List<Node<Row>> inputs = Commons.transform(rel.getInputs(), this::visit);
 
-        MinusNode<Row> node = new MinusNode<>(ctx, rowType, aggType, rel.all, rowFactory);
+        AbstractSetOpNode<Row> node;
+
+        if (rel instanceof Minus)
+            node = new MinusNode<>(ctx, rowType, rel.aggregateType(), rel.all(), rowFactory);
+        else if (rel instanceof Intersect)
+            node = new IntersectNode<>(ctx, rowType, rel.aggregateType(), rel.all(), rowFactory, rel.getInputs().size());
+        else
+            throw new AssertionError();
 
         node.register(inputs);
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
similarity index 76%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
index 72c8025..6e60a88 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
@@ -32,20 +32,14 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
- * Execution node for MINUS (EXCEPT) operator.
+ * Abstract execution node for set operators (EXCEPT, INTERSECT).
  */
-public class MinusNode<Row> extends AbstractNode<Row> {
+public abstract class AbstractSetOpNode<Row> extends AbstractNode<Row> {
     /** */
     private final AggregateType type;
 
     /** */
-    private final boolean all;
-
-    /** */
-    private final RowFactory<Row> rowFactory;
-
-    /** */
-    private final Grouping grouping;
+    private final Grouping<Row> grouping;
 
     /** */
     private int requested;
@@ -59,18 +53,13 @@ public class MinusNode<Row> extends AbstractNode<Row> {
     /** */
     private boolean inLoop;
 
-    /**
-     * @param ctx Execution context.
-     */
-    public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
-        RowFactory<Row> rowFactory) {
+    /** */
+    protected AbstractSetOpNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, Grouping<Row> grouping) {
         super(ctx, rowType);
 
-        this.all = all;
         this.type = type;
-        this.rowFactory = rowFactory;
-
-        grouping = new Grouping();
+        this.grouping = grouping;
     }
 
     /** {@inheritDoc} */
@@ -112,6 +101,8 @@ public class MinusNode<Row> extends AbstractNode<Row> {
 
         checkState();
 
+        grouping.endOfSet(idx);
+
         if (type == AggregateType.SINGLE && grouping.isEmpty())
             curSrcIdx = sources().size(); // Skip subsequent sources.
         else
@@ -137,15 +128,15 @@ public class MinusNode<Row> extends AbstractNode<Row> {
     @Override protected Downstream<Row> requestDownstream(int idx) {
         return new Downstream<Row>() {
             @Override public void push(Row row) throws Exception {
-                MinusNode.this.push(row, idx);
+                AbstractSetOpNode.this.push(row, idx);
             }
 
             @Override public void end() throws Exception {
-                MinusNode.this.end(idx);
+                AbstractSetOpNode.this.end(idx);
             }
 
             @Override public void onError(Throwable e) {
-                MinusNode.this.onError(e);
+                AbstractSetOpNode.this.onError(e);
             }
         };
     }
@@ -164,13 +155,12 @@ public class MinusNode<Row> extends AbstractNode<Row> {
         inLoop = true;
 
         try {
-            while (requested > 0 && !grouping.isEmpty()) {
+            if (requested > 0 && !grouping.isEmpty()) {
                 int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
 
                 for (Row row : grouping.getRows(toSnd)) {
-                    checkState();
-
                     requested--;
+
                     downstream().push(row);
 
                     processed++;
@@ -196,19 +186,31 @@ public class MinusNode<Row> extends AbstractNode<Row> {
     }
 
     /** */
-    private class Grouping {
-        /**
-         * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
-         * keys in all sets except first.
-         */
-        private final Map<GroupKey, int[]> groups = new HashMap<>();
+    protected abstract static class Grouping<Row> {
+        /** */
+        protected final Map<GroupKey, int[]> groups = new HashMap<>();
+
+        /** */
+        protected final RowHandler<Row> hnd;
+
+        /** */
+        protected final AggregateType type;
 
         /** */
-        private final RowHandler<Row> hnd;
+        protected final boolean all;
 
         /** */
-        private Grouping() {
-            hnd = context().rowHandler();
+        protected final RowFactory<Row> rowFactory;
+
+        /** Processed rows count in current set. */
+        protected int rowsCnt = 0;
+
+        /** */
+        protected Grouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type, boolean all) {
+            hnd = ctx.rowHandler();
+            this.type = type;
+            this.all = all;
+            this.rowFactory = rowFactory;
         }
 
         /** */
@@ -222,6 +224,8 @@ public class MinusNode<Row> extends AbstractNode<Row> {
                 addOnMapper(row, setIdx);
             else
                 addOnSingle(row, setIdx);
+
+            rowsCnt++;
         }
 
         /**
@@ -239,7 +243,7 @@ public class MinusNode<Row> extends AbstractNode<Row> {
         }
 
         /** */
-        private GroupKey key(Row row) {
+        protected GroupKey key(Row row) {
             int size = hnd.columnCount(row);
 
             Object[] fields = new Object[size];
@@ -251,49 +255,31 @@ public class MinusNode<Row> extends AbstractNode<Row> {
         }
 
         /** */
-        private void addOnSingle(Row row, int setIdx) {
-            int[] cntrs;
-
-            GroupKey key = key(row);
-
-            if (setIdx == 0) {
-                cntrs = groups.computeIfAbsent(key, k -> new int[2]);
-
-                cntrs[0]++;
-            }
-            else {
-                cntrs = groups.get(key);
-
-                if (cntrs != null) {
-                    cntrs[1]++;
-
-                    if (cntrs[1] >= cntrs[0])
-                        groups.remove(key);
-                }
-            }
+        protected void endOfSet(int setIdx) {
+            rowsCnt = 0;
         }
 
         /** */
-        private void addOnMapper(Row row, int setIdx) {
-            int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[2]);
+        protected abstract void addOnSingle(Row row, int setIdx);
 
-            cntrs[setIdx == 0 ? 0 : 1]++;
-        }
+        /** */
+        protected abstract void addOnMapper(Row row, int setIdx);
 
         /** */
-        private void addOnReducer(Row row) {
+        protected void addOnReducer(Row row) {
             GroupKey grpKey = (GroupKey)hnd.get(0, row);
+            int[] cntrsMap = (int[])hnd.get(1, row);
 
-            int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[2]);
+            int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[cntrsMap.length]);
 
-            int[] cntrsMap = (int[])hnd.get(1, row);
+            assert cntrs.length == cntrsMap.length;
 
             for (int i = 0; i < cntrsMap.length; i++)
                 cntrs[i] += cntrsMap[i];
         }
 
         /** */
-        private List<Row> getOnMapper(int cnt) {
+        protected List<Row> getOnMapper(int cnt) {
             Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
 
             int amount = Math.min(cnt, groups.size());
@@ -303,7 +289,7 @@ public class MinusNode<Row> extends AbstractNode<Row> {
                 Map.Entry<GroupKey, int[]> entry = it.next();
 
                 // Skip row if it doesn't affect the final result.
-                if (entry.getValue()[0] != entry.getValue()[1]) {
+                if (affectResult(entry.getValue())) {
                     res.add(rowFactory.create(entry.getKey(), entry.getValue()));
 
                     amount--;
@@ -316,7 +302,7 @@ public class MinusNode<Row> extends AbstractNode<Row> {
         }
 
         /** */
-        private List<Row> getOnSingleOrReducer(int cnt) {
+        protected List<Row> getOnSingleOrReducer(int cnt) {
             Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
 
             List<Row> res = new ArrayList<>(cnt);
@@ -360,23 +346,16 @@ public class MinusNode<Row> extends AbstractNode<Row> {
             return res;
         }
 
-        /** */
-        private int availableRows(int[] cntrs) {
-            assert cntrs.length == 2;
-
-            if (all)
-                return Math.max(cntrs[0] - cntrs[1], 0);
-            else
-                return cntrs[1] == 0 ? 1 : 0;
-        }
+        /**
+         * Return {@code true} if counters affects the final result, or {@code false} if row can be skipped.
+         */
+        protected abstract boolean affectResult(int[] cntrs);
 
         /** */
-        private void decrementAvailableRows(int[] cntrs, int amount) {
-            assert amount > 0;
-            assert all;
+        protected abstract int availableRows(int[] cntrs);
 
-            cntrs[0] -= amount;
-        }
+        /** */
+        protected abstract void decrementAvailableRows(int[] cntrs, int amount);
 
         /** */
         private boolean isEmpty() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
new file mode 100644
index 0000000..15d4bbf
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+
+/**
+ * Execution node for INTERSECT operator.
+ */
+public class IntersectNode<Row> extends AbstractSetOpNode<Row> {
+    /** */
+    public IntersectNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, int inputsCnt) {
+        super(ctx, rowType, type, all, rowFactory, new IntersectGrouping<>(ctx, rowFactory, type, all, inputsCnt));
+    }
+
+    /** */
+    private static class IntersectGrouping<Row> extends Grouping<Row> {
+        /** Inputs count. */
+        private final int inputsCnt;
+
+        /** */
+        private IntersectGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type,
+            boolean all, int inputsCnt) {
+            super(ctx, rowFactory, type, all);
+
+            this.inputsCnt = inputsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void endOfSet(int setIdx) {
+            if (type == AggregateType.SINGLE && rowsCnt == 0)
+                groups.clear();
+
+            super.endOfSet(setIdx);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void addOnSingle(Row row, int setIdx) {
+            int[] cntrs;
+
+            GroupKey key = key(row);
+
+            if (setIdx == 0) {
+                cntrs = groups.computeIfAbsent(key, k -> new int[inputsCnt]);
+
+                cntrs[0]++;
+            }
+            else {
+                cntrs = groups.get(key);
+
+                if (cntrs != null) {
+                    if (cntrs[setIdx - 1] == 0)
+                        groups.remove(key);
+                    else
+                        cntrs[setIdx]++;
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void addOnMapper(Row row, int setIdx) {
+            int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[inputsCnt]);
+
+            cntrs[setIdx]++;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean affectResult(int[] cntrs) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int availableRows(int[] cntrs) {
+            int cnt = cntrs[0];
+
+            for (int i = 1; i < cntrs.length; i++) {
+                if (cntrs[i] < cnt)
+                    cnt = cntrs[i];
+            }
+
+            if (all) {
+                cntrs[0] = cnt; // Whith this we can decrement only the first element to get the same result.
+
+                return cnt;
+            }
+            else
+                return cnt == 0 ? 0 : 1;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void decrementAvailableRows(int[] cntrs, int amount) {
+            assert amount > 0;
+            assert all;
+
+            cntrs[0] -= amount;
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
index 72c8025..056fbbb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
@@ -17,251 +17,43 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Execution node for MINUS (EXCEPT) operator.
  */
-public class MinusNode<Row> extends AbstractNode<Row> {
+public class MinusNode<Row> extends AbstractSetOpNode<Row> {
     /** */
-    private final AggregateType type;
-
-    /** */
-    private final boolean all;
-
-    /** */
-    private final RowFactory<Row> rowFactory;
-
-    /** */
-    private final Grouping grouping;
-
-    /** */
-    private int requested;
-
-    /** */
-    private int waiting;
-
-    /** Current source index. */
-    private int curSrcIdx;
-
-    /** */
-    private boolean inLoop;
-
-    /**
-     * @param ctx Execution context.
-     */
     public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
         RowFactory<Row> rowFactory) {
-        super(ctx, rowType);
-
-        this.all = all;
-        this.type = type;
-        this.rowFactory = rowFactory;
-
-        grouping = new Grouping();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void request(int rowsCnt) throws Exception {
-        assert !F.isEmpty(sources());
-        assert rowsCnt > 0 && requested == 0;
-        assert waiting <= 0;
-
-        checkState();
-
-        requested = rowsCnt;
-
-        if (waiting == 0)
-            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
-        else if (!inLoop)
-            context().execute(this::flush, this::onError);
-    }
-
-    /** */
-    public void push(Row row, int idx) throws Exception {
-        assert downstream() != null;
-        assert waiting > 0;
-
-        checkState();
-
-        waiting--;
-
-        grouping.add(row, idx);
-
-        if (waiting == 0)
-            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
-    }
-
-    /** */
-    public void end(int idx) throws Exception {
-        assert downstream() != null;
-        assert waiting > 0;
-        assert curSrcIdx == idx;
-
-        checkState();
-
-        if (type == AggregateType.SINGLE && grouping.isEmpty())
-            curSrcIdx = sources().size(); // Skip subsequent sources.
-        else
-            curSrcIdx++;
-
-        if (curSrcIdx >= sources().size()) {
-            waiting = -1;
-
-            flush();
-        }
-        else
-            sources().get(curSrcIdx).request(waiting);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void rewindInternal() {
-        requested = 0;
-        waiting = 0;
-        grouping.groups.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Downstream<Row> requestDownstream(int idx) {
-        return new Downstream<Row>() {
-            @Override public void push(Row row) throws Exception {
-                MinusNode.this.push(row, idx);
-            }
-
-            @Override public void end() throws Exception {
-                MinusNode.this.end(idx);
-            }
-
-            @Override public void onError(Throwable e) {
-                MinusNode.this.onError(e);
-            }
-        };
-    }
-
-    /** */
-    private void flush() throws Exception {
-        if (isClosed())
-            return;
-
-        checkState();
-
-        assert waiting == -1;
-
-        int processed = 0;
-
-        inLoop = true;
-
-        try {
-            while (requested > 0 && !grouping.isEmpty()) {
-                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
-
-                for (Row row : grouping.getRows(toSnd)) {
-                    checkState();
-
-                    requested--;
-                    downstream().push(row);
-
-                    processed++;
-                }
-
-                if (processed >= IN_BUFFER_SIZE && requested > 0) {
-                    // Allow others to do their job.
-                    context().execute(this::flush, this::onError);
-
-                    return;
-                }
-            }
-        }
-        finally {
-            inLoop = false;
-        }
-
-        if (requested > 0) {
-            requested = 0;
-
-            downstream().end();
-        }
+        super(ctx, rowType, type, all, rowFactory, new MinusGrouping<>(ctx, rowFactory, type, all));
     }
 
     /** */
-    private class Grouping {
-        /**
-         * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
-         * keys in all sets except first.
-         */
-        private final Map<GroupKey, int[]> groups = new HashMap<>();
-
-        /** */
-        private final RowHandler<Row> hnd;
-
-        /** */
-        private Grouping() {
-            hnd = context().rowHandler();
-        }
-
-        /** */
-        private void add(Row row, int setIdx) {
-            if (type == AggregateType.REDUCE) {
-                assert setIdx == 0 : "Unexpected set index: " + setIdx;
-
-                addOnReducer(row);
-            }
-            else if (type == AggregateType.MAP)
-                addOnMapper(row, setIdx);
-            else
-                addOnSingle(row, setIdx);
-        }
-
-        /**
-         * @param cnt Number of rows.
-         *
-         * @return Actually sent rows number.
-         */
-        private List<Row> getRows(int cnt) {
-            if (F.isEmpty(groups))
-                return Collections.emptyList();
-            else if (type == AggregateType.MAP)
-                return getOnMapper(cnt);
-            else
-                return getOnSingleOrReducer(cnt);
-        }
-
+    private static class MinusGrouping<Row> extends Grouping<Row> {
         /** */
-        private GroupKey key(Row row) {
-            int size = hnd.columnCount(row);
-
-            Object[] fields = new Object[size];
-
-            for (int i = 0; i < size; i++)
-                fields[i] = hnd.get(i, row);
-
-            return new GroupKey(fields);
+        private MinusGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type, boolean all) {
+            super(ctx, rowFactory, type, all);
         }
 
-        /** */
-        private void addOnSingle(Row row, int setIdx) {
+        /** {@inheritDoc} */
+        @Override protected void addOnSingle(Row row, int setIdx) {
             int[] cntrs;
 
             GroupKey key = key(row);
 
             if (setIdx == 0) {
+                // Value in the map will always have 2 elements, first - count of keys in the first set,
+                // second - count of keys in all sets except first.
                 cntrs = groups.computeIfAbsent(key, k -> new int[2]);
 
                 cntrs[0]++;
             }
-            else {
+            else if (all) {
                 cntrs = groups.get(key);
 
                 if (cntrs != null) {
@@ -271,97 +63,26 @@ public class MinusNode<Row> extends AbstractNode<Row> {
                         groups.remove(key);
                 }
             }
+            else
+                groups.remove(key);
         }
 
-        /** */
-        private void addOnMapper(Row row, int setIdx) {
+        /** {@inheritDoc} */
+        @Override protected void addOnMapper(Row row, int setIdx) {
+            // Value in the map will always have 2 elements, first - count of keys in the first set,
+            // second - count of keys in all sets except first.
             int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[2]);
 
             cntrs[setIdx == 0 ? 0 : 1]++;
         }
 
-        /** */
-        private void addOnReducer(Row row) {
-            GroupKey grpKey = (GroupKey)hnd.get(0, row);
-
-            int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[2]);
-
-            int[] cntrsMap = (int[])hnd.get(1, row);
-
-            for (int i = 0; i < cntrsMap.length; i++)
-                cntrs[i] += cntrsMap[i];
-        }
-
-        /** */
-        private List<Row> getOnMapper(int cnt) {
-            Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
-
-            int amount = Math.min(cnt, groups.size());
-            List<Row> res = new ArrayList<>(amount);
-
-            while (amount > 0 && it.hasNext()) {
-                Map.Entry<GroupKey, int[]> entry = it.next();
-
-                // Skip row if it doesn't affect the final result.
-                if (entry.getValue()[0] != entry.getValue()[1]) {
-                    res.add(rowFactory.create(entry.getKey(), entry.getValue()));
-
-                    amount--;
-                }
-
-                it.remove();
-            }
-
-            return res;
-        }
-
-        /** */
-        private List<Row> getOnSingleOrReducer(int cnt) {
-            Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
-
-            List<Row> res = new ArrayList<>(cnt);
-
-            while (it.hasNext() && cnt > 0) {
-                Map.Entry<GroupKey, int[]> entry = it.next();
-
-                GroupKey key = entry.getKey();
-
-                Object[] fields = new Object[key.fieldsCount()];
-
-                for (int i = 0; i < fields.length; i++)
-                    fields[i] = key.field(i);
-
-                Row row = rowFactory.create(fields);
-
-                int[] cntrs = entry.getValue();
-
-                int availableRows = availableRows(entry.getValue());
-
-                if (availableRows <= cnt) {
-                    it.remove();
-
-                    if (availableRows == 0)
-                        continue;
-
-                    cnt -= availableRows;
-                }
-                else {
-                    availableRows = cnt;
-
-                    decrementAvailableRows(cntrs, availableRows);
-
-                    cnt = 0;
-                }
-
-                for (int i = 0; i < availableRows; i++)
-                    res.add(row);
-            }
-
-            return res;
+        /** {@inheritDoc} */
+        @Override protected boolean affectResult(int[] cntrs) {
+            return cntrs[0] != cntrs[1];
         }
 
-        /** */
-        private int availableRows(int[] cntrs) {
+        /** {@inheritDoc} */
+        @Override protected int availableRows(int[] cntrs) {
             assert cntrs.length == 2;
 
             if (all)
@@ -370,17 +91,12 @@ public class MinusNode<Row> extends AbstractNode<Row> {
                 return cntrs[1] == 0 ? 1 : 0;
         }
 
-        /** */
-        private void decrementAvailableRows(int[] cntrs, int amount) {
+        /** {@inheritDoc} */
+        @Override protected void decrementAvailableRows(int[] cntrs, int amount) {
             assert amount > 0;
             assert all;
 
             cntrs[0] -= amount;
         }
-
-        /** */
-        private boolean isEmpty() {
-            return groups.isEmpty();
-        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 67c4b23..102d969 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -32,7 +32,7 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
@@ -118,9 +118,9 @@ public class IgniteMdRowCount extends RelMdRowCount {
     }
 
     /**
-     * Estimation of row count for MINUS (EXCEPT) operator.
+     * Estimation of row count for set op (MINUS, INTERSECT).
      */
-    public double getRowCount(IgniteMinusBase rel, RelMetadataQuery mq) {
+    public double getRowCount(IgniteSetOp rel, RelMetadataQuery mq) {
         return rel.estimateRowCount(mq);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index e5f88c4..6241243 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -47,9 +47,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -230,17 +228,7 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSingleMinus rel) {
-        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteMapMinus rel) {
-        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReduceMinus rel) {
+    @Override public IgniteRel visit(IgniteSetOp rel) {
         return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 8f5aec2..6978395 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
-
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -47,9 +46,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /** */
@@ -180,28 +177,18 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteRel rel) {
-        return rel.accept(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSingleMinus rel) {
+    @Override public IgniteRel visit(IgniteSetOp rel) {
         return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteMapMinus rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReduceMinus rel) {
+    @Override public IgniteRel visit(IgniteTableFunctionScan rel) {
         return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTableFunctionScan rel) {
-        return processNode(rel);
+    @Override public IgniteRel visit(IgniteRel rel) {
+        return rel.accept(this);
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 722e8f3..f2a479d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -48,9 +48,9 @@ import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMerge
 import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
-import org.apache.ignite.internal.processors.query.calcite.rule.MinusConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.SetOpConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.TableFunctionScanConverterRule;
@@ -144,6 +144,7 @@ public enum PlannerPhase {
 
                     CoreRules.UNION_MERGE,
                     CoreRules.MINUS_MERGE,
+                    CoreRules.INTERSECT_MERGE,
                     CoreRules.UNION_REMOVE,
                     CoreRules.JOIN_COMMUTE,
                     CoreRules.AGGREGATE_REMOVE,
@@ -178,8 +179,10 @@ public enum PlannerPhase {
                     HashAggregateConverterRule.MAP_REDUCE,
                     SortAggregateConverterRule.SINGLE,
                     SortAggregateConverterRule.MAP_REDUCE,
-                    MinusConverterRule.SINGLE,
-                    MinusConverterRule.MAP_REDUCE,
+                    SetOpConverterRule.SINGLE_MINUS,
+                    SetOpConverterRule.MAP_REDUCE_MINUS,
+                    SetOpConverterRule.SINGLE_INTERSECT,
+                    SetOpConverterRule.MAP_REDUCE_INTERSECT,
                     ProjectConverterRule.INSTANCE,
                     FilterConverterRule.INSTANCE,
                     TableModifyConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 95e5f2f..444f254 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -23,9 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
 
 /**
  * A visitor to traverse an Ignite relational nodes tree.
@@ -159,17 +157,7 @@ public interface IgniteRelVisitor<T> {
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
-    T visit(IgniteSingleMinus rel);
-
-    /**
-     * See {@link IgniteRelVisitor#visit(IgniteRel)}
-     */
-    T visit(IgniteMapMinus rel);
-
-    /**
-     * See {@link IgniteRelVisitor#visit(IgniteRel)}
-     */
-    T visit(IgniteReduceMinus rel);
+    T visit(IgniteSetOp rel);
 
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
index 0e0fb43..aed4e38 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableFunctionScan;
 import org.apache.calcite.rel.metadata.RelColumnMapping;
@@ -31,6 +32,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.util.typedef.F;
 
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
 /**
  * Relational operator for table function scan.
  */
@@ -50,6 +53,15 @@ public class IgniteTableFunctionScan extends TableFunctionScan implements Ignite
         super(cluster, traits, ImmutableList.of(), call, null, rowType, null);
     }
 
+    /**
+     * Constructor used for deserialization.
+     *
+     * @param input Serialized representation.
+     */
+    public IgniteTableFunctionScan(RelInput input) {
+        super(changeTraits(input, IgniteConvention.INSTANCE));
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteTableFunctionScan(cluster, getTraitSet(), getCall(), getRowType());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteIntersect.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteIntersect.java
new file mode 100644
index 0000000..3f876a4
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteIntersect.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ * Base class for physical INTERSECT set op.
+ */
+public abstract class IgniteIntersect extends Intersect implements IgniteSetOp {
+    /** */
+    IgniteIntersect(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+        super(cluster, traits, inputs, all);
+    }
+
+    /** {@inheritDoc} */
+    protected IgniteIntersect(RelInput input) {
+        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+    }
+
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        final List<RelNode> inputs = getInputs();
+
+        double rows = mq.getRowCount(inputs.get(0));
+
+        for (int i = 1; i < inputs.size(); i++)
+            rows = 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
+
+        return rows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return computeSetOpCost(planner, mq);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean all() {
+        return all;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapIntersect.java
similarity index 50%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapIntersect.java
index 0e0fb43..0cc1771 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapIntersect.java
@@ -15,44 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
 
-import java.lang.reflect.Type;
 import java.util.List;
-import java.util.Set;
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableFunctionScan;
-import org.apache.calcite.rel.metadata.RelColumnMapping;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Relational operator for table function scan.
+ * Physical node for MAP phase of INTERSECT operator.
  */
-public class IgniteTableFunctionScan extends TableFunctionScan implements IgniteRel {
-    /** Default estimate row count. */
-    private static final int ESTIMATE_ROW_COUNT = 100;
-
-    /**
-     * Creates a TableFunctionScan.
-     */
-    public IgniteTableFunctionScan(
+public class IgniteMapIntersect extends IgniteIntersect implements IgniteMapSetOp {
+    /** */
+    public IgniteMapIntersect(
         RelOptCluster cluster,
-        RelTraitSet traits,
-        RexNode call,
-        RelDataType rowType
+        RelTraitSet traitSet,
+        List<RelNode> inputs,
+        boolean all
     ) {
-        super(cluster, traits, ImmutableList.of(), call, null, rowType, null);
+        super(cluster, traitSet, inputs, all);
+    }
+
+    /** */
+    public IgniteMapIntersect(RelInput input) {
+        super(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteMapIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteMapIntersect(getCluster(), traitSet, inputs, all);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteTableFunctionScan(cluster, getTraitSet(), getCall(), getRowType());
+        return new IgniteMapIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
     }
 
     /** {@inheritDoc} */
@@ -61,15 +62,12 @@ public class IgniteTableFunctionScan extends TableFunctionScan implements Ignite
     }
 
     /** {@inheritDoc} */
-    @Override public TableFunctionScan copy(RelTraitSet traitSet, List<RelNode> inputs, RexNode rexCall,
-        Type elementType, RelDataType rowType, Set<RelColumnMapping> columnMappings) {
-        assert F.isEmpty(inputs);
-
-        return this;
+    @Override protected RelDataType deriveRowType() {
+        return buildRowType();
     }
 
     /** {@inheritDoc} */
-    @Override public double estimateRowCount(RelMetadataQuery mq) {
-        return ESTIMATE_ROW_COUNT;
+    @Override public int aggregateFieldsCount() {
+        return getInput(0).getRowType().getFieldCount() + getInputs().size();
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
index 061cc1c..9bbfa5b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
@@ -18,31 +18,19 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.set;
 
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Physical node for MAP phase of MINUS (EXCEPT) operator.
  */
-public class IgniteMapMinus extends IgniteMinusBase {
+public class IgniteMapMinus extends IgniteMinus implements IgniteMapSetOp {
     /** */
     public IgniteMapMinus(
         RelOptCluster cluster,
@@ -75,67 +63,11 @@ public class IgniteMapMinus extends IgniteMinusBase {
 
     /** {@inheritDoc} */
     @Override protected RelDataType deriveRowType() {
-        RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
-
-        assert typeFactory instanceof IgniteTypeFactory;
-
-        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
-
-        builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
-        builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
-
-        return builder.build();
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        boolean rewindable = inputTraits.stream()
-            .map(TraitUtils::rewindability)
-            .allMatch(RewindabilityTrait::rewindable);
-
-        if (rewindable)
-            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
-            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
-            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
-
-        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
-            return ImmutableList.of(); // Mixing of single and random is prohibited.
-
-        return ImmutableList.of(
-            Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
-                t -> t.replace(IgniteDistributions.random())))
-        );
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits
-    ) {
-        Set<CorrelationId> correlationIds = inTraits.stream()
-            .map(TraitUtils::correlation)
-            .flatMap(corrTr -> corrTr.correlationIds().stream())
-            .collect(Collectors.toSet());
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
-            inTraits));
+        return buildRowType();
     }
 
     /** {@inheritDoc} */
-    @Override protected int aggregateFieldsCount() {
+    @Override public int aggregateFieldsCount() {
         return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
similarity index 66%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
index 061cc1c..f38ec09 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
@@ -21,17 +21,13 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -40,55 +36,11 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Physical node for MAP phase of MINUS (EXCEPT) operator.
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
  */
-public class IgniteMapMinus extends IgniteMinusBase {
-    /** */
-    public IgniteMapMinus(
-        RelOptCluster cluster,
-        RelTraitSet traitSet,
-        List<RelNode> inputs,
-        boolean all
-    ) {
-        super(cluster, traitSet, inputs, all);
-    }
-
-    /** */
-    public IgniteMapMinus(RelInput input) {
-        super(input);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-        return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected RelDataType deriveRowType() {
-        RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
-
-        assert typeFactory instanceof IgniteTypeFactory;
-
-        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
-
-        builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
-        builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
-
-        return builder.build();
-    }
-
+public interface IgniteMapSetOp extends IgniteSetOp {
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
     ) {
@@ -104,24 +56,24 @@ public class IgniteMapMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
     ) {
         if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
             return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
 
-        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
-            return ImmutableList.of(); // Mixing of single and random is prohibited.
-
         return ImmutableList.of(
             Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
-                t -> t.replace(IgniteDistributions.random())))
+                t -> TraitUtils.distribution(t) == IgniteDistributions.broadcast() ?
+                    // Allow broadcast with trim-exchange to be used in map-reduce set-op.
+                    t.replace(IgniteDistributions.hash(ImmutableList.of(0))) :
+                    t.replace(IgniteDistributions.random())))
         );
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits
     ) {
@@ -134,8 +86,22 @@ public class IgniteMapMinus extends IgniteMinusBase {
             inTraits));
     }
 
+    /** Build RowType for MAP node. */
+    public default RelDataType buildRowType() {
+        RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
+
+        assert typeFactory instanceof IgniteTypeFactory;
+
+        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+        builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
+        builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
+
+        return builder.build();
+    }
+
     /** {@inheritDoc} */
-    @Override protected int aggregateFieldsCount() {
-        return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+    @Override public default AggregateType aggregateType() {
+        return AggregateType.MAP;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinus.java
similarity index 53%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinus.java
index b2cf77b..1a57a4b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinus.java
@@ -18,59 +18,35 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.set;
 
 import java.util.List;
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Base class for physical MINUS (EXCEPT) set op.
  */
-public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
+public abstract class IgniteMinus extends Minus implements IgniteSetOp {
     /** Count of counter fields used to aggregate results. */
     protected static final int COUNTER_FIELDS_CNT = 2;
 
     /** */
-    IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+    IgniteMinus(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
         super(cluster, traits, inputs, all);
     }
 
     /** {@inheritDoc} */
-    protected IgniteMinusBase(RelInput input) {
+    protected IgniteMinus(RelInput input) {
         super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Operation erases collation.
-        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Operation erases collation.
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
-    }
-
-    /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
-    protected abstract int aggregateFieldsCount();
-
-    /** {@inheritDoc} */
     @Override public double estimateRowCount(RelMetadataQuery mq) {
         final List<RelNode> inputs = getInputs();
 
@@ -84,17 +60,11 @@ public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgnite
 
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
-
-        double rows = estimateRowCount(mq);
-
-        double inputRows = 0;
-
-        for (RelNode input : getInputs())
-            inputRows += mq.getRowCount(input);
-
-        double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+        return computeSetOpCost(planner, mq);
+    }
 
-        return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+    /** {@inheritDoc} */
+    @Override public boolean all() {
+        return all;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
similarity index 50%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
index c068e8e..63a5765 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
@@ -27,22 +27,16 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Physical node for REDUCE phase of MINUS (EXCEPT) operator.
+ * Physical node for REDUCE phase of INTERSECT operator.
  */
-public class IgniteReduceMinus extends IgniteMinusBase {
+public class IgniteReduceIntersect extends IgniteIntersect implements IgniteReduceSetOp {
     /** */
-    public IgniteReduceMinus(
+    public IgniteReduceIntersect(
         RelOptCluster cluster,
         RelTraitSet traitSet,
         RelNode input,
@@ -55,7 +49,7 @@ public class IgniteReduceMinus extends IgniteMinusBase {
     }
 
     /** */
-    public IgniteReduceMinus(RelInput input) {
+    public IgniteReduceIntersect(RelInput input) {
         this(
             input.getCluster(),
             input.getTraitSet().replace(IgniteConvention.INSTANCE),
@@ -74,53 +68,13 @@ public class IgniteReduceMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        return ImmutableList.of(
-            Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
-        if (IgniteDistributions.single().satisfies(distr)) {
-            return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
-                Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
-            ImmutableList.of(sole(inputTraits).replace(IgniteDistributions.single()))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits
-    ) {
-        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
     @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-        return new IgniteReduceMinus(getCluster(), traitSet, sole(inputs), all, rowType);
+        return new IgniteReduceIntersect(getCluster(), traitSet, sole(inputs), all, rowType);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteReduceMinus clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteReduceMinus(cluster, getTraitSet(), sole(inputs), all, rowType);
+    @Override public IgniteReduceIntersect clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteReduceIntersect(cluster, getTraitSet(), sole(inputs), all, rowType);
     }
 
     /** {@inheritDoc} */
@@ -129,7 +83,7 @@ public class IgniteReduceMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int aggregateFieldsCount() {
-        return rowType.getFieldCount() + COUNTER_FIELDS_CNT;
+    @Override public int aggregateFieldsCount() {
+        return rowType.getFieldCount() + 2 /* At least two fields required for count aggregation. */;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
index c068e8e..63ca97e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
@@ -27,20 +27,14 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Physical node for REDUCE phase of MINUS (EXCEPT) operator.
  */
-public class IgniteReduceMinus extends IgniteMinusBase {
+public class IgniteReduceMinus extends IgniteMinus implements IgniteReduceSetOp {
     /** */
     public IgniteReduceMinus(
         RelOptCluster cluster,
@@ -74,46 +68,6 @@ public class IgniteReduceMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        return ImmutableList.of(
-            Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
-        if (IgniteDistributions.single().satisfies(distr)) {
-            return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
-                Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
-            ImmutableList.of(sole(inputTraits).replace(IgniteDistributions.single()))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits
-    ) {
-        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
     @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
         return new IgniteReduceMinus(getCluster(), traitSet, sole(inputs), all, rowType);
     }
@@ -129,7 +83,7 @@ public class IgniteReduceMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int aggregateFieldsCount() {
+    @Override public int aggregateFieldsCount() {
         return rowType.getFieldCount() + COUNTER_FIELDS_CNT;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceSetOp.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceSetOp.java
new file mode 100644
index 0000000..c618581
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceSetOp.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for REDUCE phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteReduceSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(
+            Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+            ImmutableList.of(inputTraits.get(0).replace(IgniteDistributions.single()))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+            inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default AggregateType aggregateType() {
+        return AggregateType.REDUCE;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
similarity index 58%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
index b2cf77b..e085c43 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
@@ -19,75 +19,49 @@ package org.apache.ignite.internal.processors.query.calcite.rel.set;
 
 import java.util.List;
 import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Base class for physical MINUS (EXCEPT) set op.
+ * Base interface for physical set op node (MINUS, INTERSECT).
  */
-public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
-    /** Count of counter fields used to aggregate results. */
-    protected static final int COUNTER_FIELDS_CNT = 2;
-
-    /** */
-    IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
-        super(cluster, traits, inputs, all);
-    }
+public interface IgniteSetOp extends TraitsAwareIgniteRel {
+    /** ALL flag of set op. */
+    public boolean all();
 
     /** {@inheritDoc} */
-    protected IgniteMinusBase(RelInput input) {
-        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits) {
         // Operation erases collation.
         return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
             Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits) {
         // Operation erases collation.
         return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
             Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
     }
 
     /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
-    protected abstract int aggregateFieldsCount();
-
-    /** {@inheritDoc} */
-    @Override public double estimateRowCount(RelMetadataQuery mq) {
-        final List<RelNode> inputs = getInputs();
+    public int aggregateFieldsCount();
 
-        double rows = mq.getRowCount(inputs.get(0));
-
-        for (int i = 1; i < inputs.size(); i++)
-            rows -= 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
-
-        return rows;
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    /** Compute cost for set op. */
+    public default RelOptCost computeSetOpCost(RelOptPlanner planner, RelMetadataQuery mq) {
         IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
 
-        double rows = estimateRowCount(mq);
-
         double inputRows = 0;
 
         for (RelNode input : getInputs())
@@ -95,6 +69,9 @@ public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgnite
 
         double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
 
-        return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+        return costFactory.makeCost(inputRows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
     }
+
+    /** Aggregate type. */
+    public AggregateType aggregateType();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleIntersect.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleIntersect.java
new file mode 100644
index 0000000..1225146
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleIntersect.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for INTERSECT operator which inputs satisfy SINGLE distribution.
+ */
+public class IgniteSingleIntersect extends IgniteIntersect implements IgniteSingleSetOp {
+    /** {@inheritDoc} */
+    public IgniteSingleIntersect(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        List<RelNode> inputs,
+        boolean all
+    ) {
+        super(cluster, traitSet, inputs, all);
+    }
+
+    /** */
+    public IgniteSingleIntersect(RelInput input) {
+        super(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteSingleIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteSingleIntersect(getCluster(), traitSet, inputs, all);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteSingleIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int aggregateFieldsCount() {
+        return getInput(0).getRowType().getFieldCount() + getInputs().size();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
index 3165bb5..3f83a2a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
@@ -18,28 +18,18 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.set;
 
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
  */
-public class IgniteSingleMinus extends IgniteMinusBase {
+public class IgniteSingleMinus extends IgniteMinus implements IgniteSingleSetOp {
     /** {@inheritDoc} */
     public IgniteSingleMinus(
         RelOptCluster cluster,
@@ -56,63 +46,6 @@ public class IgniteSingleMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        boolean rewindable = inputTraits.stream()
-            .map(TraitUtils::rewindability)
-            .allMatch(RewindabilityTrait::rewindable);
-
-        if (rewindable)
-            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
-            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
-        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
-        if (IgniteDistributions.single().satisfies(distr)) {
-            return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
-                Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inputTraits
-    ) {
-        boolean single = inputTraits.stream()
-            .map(TraitUtils::distribution)
-            .allMatch(d -> d.satisfies(IgniteDistributions.single()));
-
-        if (!single)
-            return ImmutableList.of();
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
-        RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits
-    ) {
-        Set<CorrelationId> correlationIds = inTraits.stream()
-            .map(TraitUtils::correlation)
-            .flatMap(corrTr -> corrTr.correlationIds().stream())
-            .collect(Collectors.toSet());
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
-            inTraits));
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
         return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
     }
@@ -128,7 +61,7 @@ public class IgniteSingleMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int aggregateFieldsCount() {
+    @Override public int aggregateFieldsCount() {
         return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleSetOp.java
similarity index 58%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleSetOp.java
index 3165bb5..dc20a70 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleSetOp.java
@@ -21,42 +21,22 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
+ * Physical node for set op (MINUS, INTERSECT) operator which inputs satisfy SINGLE distribution.
  */
-public class IgniteSingleMinus extends IgniteMinusBase {
+public interface IgniteSingleSetOp extends IgniteSetOp {
     /** {@inheritDoc} */
-    public IgniteSingleMinus(
-        RelOptCluster cluster,
-        RelTraitSet traitSet,
-        List<RelNode> inputs,
-        boolean all
-    ) {
-        super(cluster, traitSet, inputs, all);
-    }
-
-    /** */
-    public IgniteSingleMinus(RelInput input) {
-        super(input);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
     ) {
@@ -72,19 +52,16 @@ public class IgniteSingleMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
-        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
-        if (IgniteDistributions.single().satisfies(distr)) {
-            return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
-                Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
-        }
+    @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
 
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inputTraits
     ) {
@@ -99,7 +76,7 @@ public class IgniteSingleMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
         RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits
     ) {
@@ -113,22 +90,7 @@ public class IgniteSingleMinus extends IgniteMinusBase {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-        return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteSingleMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int aggregateFieldsCount() {
-        return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+    @Override public default AggregateType aggregateType() {
+        return AggregateType.SINGLE;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
deleted file mode 100644
index 7e11c0c..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.logical.LogicalMinus;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-
-/**
- * MINUS (EXCEPT) operation converter rule.
- */
-public class MinusConverterRule {
-    /** */
-    public static final RelOptRule SINGLE = new SingleMinusConverterRule();
-
-    /** */
-    public static final RelOptRule MAP_REDUCE = new MapReduceMinusConverterRule();
-
-    /** */
-    private MinusConverterRule() {
-        // No-op.
-    }
-
-    /** */
-    private static class SingleMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
-        /** */
-        SingleMinusConverterRule() {
-            super(LogicalMinus.class, "SingleMinusConverterRule");
-        }
-
-        /** {@inheritDoc} */
-        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
-            RelOptCluster cluster = setOp.getCluster();
-            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
-            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
-            List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
-
-            return new IgniteSingleMinus(cluster, outTrait, inputs, setOp.all);
-        }
-    }
-
-    /** */
-    private static class MapReduceMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
-        /** */
-        MapReduceMinusConverterRule() {
-            super(LogicalMinus.class, "MapReduceMinusConverterRule");
-        }
-
-        /** {@inheritDoc} */
-        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
-            RelOptCluster cluster = setOp.getCluster();
-            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
-            List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
-
-            RelNode map = new IgniteMapMinus(cluster, outTrait, inputs, setOp.all);
-
-            return new IgniteReduceMinus(
-                cluster,
-                outTrait.replace(IgniteDistributions.single()),
-                convert(map, inTrait.replace(IgniteDistributions.single())),
-                setOp.all,
-                cluster.getTypeFactory().leastRestrictive(Util.transform(inputs, RelNode::getRowType))
-            );
-        }
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java
new file mode 100644
index 0000000..980685c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapIntersect;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceIntersect;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleIntersect;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+
+/**
+ * Set op (MINUS, INTERSECT) converter rule.
+ */
+public class SetOpConverterRule {
+    /** */
+    public static final RelOptRule SINGLE_MINUS = new SingleMinusConverterRule();
+
+    /** */
+    public static final RelOptRule SINGLE_INTERSECT = new SingleIntersectConverterRule();
+
+    /** */
+    public static final RelOptRule MAP_REDUCE_MINUS = new MapReduceMinusConverterRule();
+
+    /** */
+    public static final RelOptRule MAP_REDUCE_INTERSECT = new MapReduceIntersectConverterRule();
+
+    /** */
+    private SetOpConverterRule() {
+        // No-op.
+    }
+
+    /** */
+    private abstract static class SingleSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
+        /** */
+        SingleSetOpConverterRule(Class<T> cls, String desc) {
+            super(cls, desc);
+        }
+
+        /** Node factory method. */
+        abstract PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all);
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T setOp) {
+            RelOptCluster cluster = setOp.getCluster();
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+            List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+            return createNode(cluster, outTrait, inputs, setOp.all);
+        }
+    }
+
+    /** */
+    private static class SingleMinusConverterRule extends SingleSetOpConverterRule<LogicalMinus> {
+        /** */
+        SingleMinusConverterRule() {
+            super(LogicalMinus.class, "SingleMinusConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+            boolean all) {
+            return new IgniteSingleMinus(cluster, traits, inputs, all);
+        }
+    }
+
+    /** */
+    private static class SingleIntersectConverterRule extends SingleSetOpConverterRule<LogicalIntersect> {
+        /** */
+        SingleIntersectConverterRule() {
+            super(LogicalIntersect.class, "SingleIntersectConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+            boolean all) {
+            return new IgniteSingleIntersect(cluster, traits, inputs, all);
+        }
+    }
+
+    /** */
+    private abstract static class MapReduceSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
+        /** */
+        MapReduceSetOpConverterRule(Class<T> cls, String desc) {
+            super(cls, desc);
+        }
+
+        /** Map node factory method. */
+        abstract PhysicalNode createMapNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+            boolean all);
+
+        /** Reduce node factory method. */
+        abstract PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+            boolean all, RelDataType rowType);
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T setOp) {
+            RelOptCluster cluster = setOp.getCluster();
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+            List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+            RelNode map = createMapNode(cluster, outTrait, inputs, setOp.all);
+
+            return createReduceNode(
+                cluster,
+                outTrait.replace(IgniteDistributions.single()),
+                convert(map, inTrait.replace(IgniteDistributions.single())),
+                setOp.all,
+                cluster.getTypeFactory().leastRestrictive(Util.transform(inputs, RelNode::getRowType))
+            );
+        }
+    }
+
+    /** */
+    private static class MapReduceMinusConverterRule extends MapReduceSetOpConverterRule<LogicalMinus> {
+        /** */
+        MapReduceMinusConverterRule() {
+            super(LogicalMinus.class, "MapReduceMinusConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override PhysicalNode createMapNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+            boolean all) {
+            return new IgniteMapMinus(cluster, traits, inputs, all);
+        }
+
+        /** {@inheritDoc} */
+        @Override PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits, RelNode input, boolean all,
+            RelDataType rowType) {
+            return new IgniteReduceMinus(cluster, traits, input, all, rowType);
+        }
+    }
+
+    /** */
+    private static class MapReduceIntersectConverterRule extends MapReduceSetOpConverterRule<LogicalIntersect> {
+        /** */
+        MapReduceIntersectConverterRule() {
+            super(LogicalIntersect.class, "MapReduceIntersectConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override PhysicalNode createMapNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+            boolean all) {
+            return new IgniteMapIntersect(cluster, traits, inputs, all);
+        }
+
+        /** {@inheritDoc} */
+        @Override PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits, RelNode input, boolean all,
+            RelDataType rowType) {
+            return new IgniteReduceIntersect(cluster, traits, input, all, rowType);
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index d2aa509..84aa2f0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -25,14 +25,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-
-import javax.cache.Cache;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
@@ -484,25 +481,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         awaitPartitionMapExchange(true, true, null);
     }
 
-    /** Copy cache with it's content to new replicated cache. */
-    private void copyCacheAsReplicated(String cacheName) throws InterruptedException {
-        IgniteCache<Object, Object> cache = client.cache(cacheName);
-
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<Object, Object>(
-            cache.getConfiguration(CacheConfiguration.class));
-
-        ccfg.setName(cacheName + "Replicated");
-        ccfg.setCacheMode(CacheMode.REPLICATED);
-        ccfg.getQueryEntities().forEach(qe -> qe.setTableName(qe.getTableName() + "_repl"));
-
-        IgniteCache<Object, Object> replCache = client.getOrCreateCache(ccfg);
-
-        for (Cache.Entry<?, ?> entry : cache)
-            replCache.put(entry.getKey(), entry.getValue());
-
-        awaitPartitionMapExchange(true, true, null);
-    }
-
     /** */
     @Test
     public void testOrderingByColumnOutsideSelectList() throws InterruptedException {
@@ -701,207 +679,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         assertEquals(1, rows.size());
     }
 
-    /** */
-    @Test
-    public void testExcept() throws Exception {
-        populateTables();
-
-        List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT SELECT name from Account");
-
-        assertEquals(1, rows.size());
-        assertEquals("Igor", rows.get(0).get(0));
-    }
-
-    /** */
-    @Test
-    public void testExceptFromEmpty() throws Exception {
-        populateTables();
-
-        copyCacheAsReplicated("orders");
-        copyCacheAsReplicated("account");
-
-        List<List<?>> rows = sql("SELECT name FROM Orders WHERE salary < 0 EXCEPT SELECT name FROM Account");
-
-        assertEquals(0, rows.size());
-
-        rows = sql("SELECT name FROM Orders_repl WHERE salary < 0 EXCEPT SELECT name FROM Account_repl");
-
-        assertEquals(0, rows.size());
-    }
-
-    /** */
-    @Test
-    public void testExceptSeveralColumns() throws Exception {
-        populateTables();
-
-        List<List<?>> rows = sql("SELECT name, salary FROM Orders EXCEPT SELECT name, salary from Account");
-
-        assertEquals(4, rows.size());
-        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
-    }
-
-    /** */
-    @Test
-    public void testExceptAll() throws Exception {
-        populateTables();
-
-        List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT ALL SELECT name from Account");
-
-        assertEquals(4, rows.size());
-        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
-    }
-
-    /** */
-    @Test
-    public void testExceptNested() throws Exception {
-        populateTables();
-
-        List<List<?>> rows =
-            sql("SELECT name FROM Orders EXCEPT (SELECT name FROM Orders EXCEPT SELECT name from Account)");
-
-        assertEquals(2, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
-    }
-
-    /** */
-    @Test
-    public void testExceptReplicatedWithPartitioned() throws Exception {
-        populateTables();
-        copyCacheAsReplicated("orders");
-
-        List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account");
-
-        assertEquals(4, rows.size());
-        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
-    }
-
-    /** */
-    @Test
-    public void testExceptReplicated() throws Exception {
-        populateTables();
-        copyCacheAsReplicated("orders");
-        copyCacheAsReplicated("account");
-
-        List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account_repl");
-
-        assertEquals(4, rows.size());
-        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
-    }
-
-    /** */
-    @Test
-    public void testExceptMerge() throws Exception {
-        populateTables();
-        copyCacheAsReplicated("orders");
-
-        List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name FROM Account EXCEPT ALL " +
-            "SELECT name FROM orders WHERE salary < 11");
-
-        assertEquals(3, rows.size());
-        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor")));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
-    }
-
-    /** */
-    @Test
-    public void testExceptBigBatch() throws Exception {
-        client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
-            .setName("cache1")
-            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table1")))
-            .setBackups(2)
-        );
-
-        client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
-            .setName("cache2")
-            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table2")))
-            .setBackups(1)
-        );
-
-        copyCacheAsReplicated("cache1");
-        copyCacheAsReplicated("cache2");
-
-        try (IgniteDataStreamer<Integer, Integer> ds1 = client.dataStreamer("cache1");
-             IgniteDataStreamer<Integer, Integer> ds2 = client.dataStreamer("cache2");
-             IgniteDataStreamer<Integer, Integer> ds3 = client.dataStreamer("cache1Replicated");
-             IgniteDataStreamer<Integer, Integer> ds4 = client.dataStreamer("cache2Replicated")
-        ) {
-            int key = 0;
-
-            for (int i = 0; i < 5; i++) {
-                for (int j = 0; j < ((i == 0) ? 1 : (1 << (i * 4 - 1))); j++) {
-                    // Cache1 keys count: 1 of "0", 8 of "1", 128 of "2", 2048 of "3", 32768 of "4".
-                    ds1.addData(key++, i);
-                    ds3.addData(key++, i);
-
-                    // Cache2 keys count: 1 of "5", 128 of "3", 32768 of "1".
-                    if ((i & 1) == 0) {
-                        ds2.addData(key++, 5 - i);
-                        ds4.addData(key++, 5 - i);
-                    }
-                }
-            }
-        }
-
-        awaitPartitionMapExchange(true, true, null);
-
-        List<List<?>> rows;
-
-        // Check 2 partitioned caches.
-        rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT SELECT _val FROM \"cache2\".table2");
-
-        assertEquals(3, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
-
-        rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT ALL SELECT _val FROM \"cache2\".table2");
-
-        assertEquals(34817, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
-        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
-        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
-        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
-
-        // Check 1 replicated and 1 partitioned caches.
-        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2\".table2");
-
-        assertEquals(3, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
-
-        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2\".table2");
-
-        assertEquals(34817, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
-        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
-        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
-        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
-
-        // Check 2 replicated caches.
-        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2Replicated\"" +
-            ".table2_repl");
-
-        assertEquals(3, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
-
-        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2Replicated\"" +
-            ".table2_repl");
-
-        assertEquals(34817, rows.size());
-        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
-        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
-        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
-        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
-    }
-
     /**
      * Execute SQL statement on given node.
      *
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java
similarity index 66%
copy from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
copy to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java
index bb7b32c..938071c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java
@@ -22,12 +22,14 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -40,9 +42,9 @@ import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.A
 import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
 
 /**
- * Test execution of MINUS (EXCEPT) operator.
+ * Abstract test for set operator (MINUS, INTERSECT) execution.
  */
-public class MinusExecutionTest extends AbstractExecutionTest {
+public abstract class AbstractSetOpExecutionTest extends AbstractExecutionTest {
     /**
      * @throws Exception If failed.
      */
@@ -55,25 +57,25 @@ public class MinusExecutionTest extends AbstractExecutionTest {
     /** */
     @Test
     public void testSingle() {
-        checkMinus(true, false);
+        checkSetOp(true, false);
     }
 
     /** */
     @Test
     public void testSingleAll() {
-        checkMinus(true, true);
+        checkSetOp(true, true);
     }
 
     /** */
     @Test
     public void testMapReduce() {
-        checkMinus(false, false);
+        checkSetOp(false, false);
     }
 
     /** */
     @Test
     public void testMapReduceAll() {
-        checkMinus(false, true);
+        checkSetOp(false, true);
     }
 
     /** */
@@ -88,11 +90,12 @@ public class MinusExecutionTest extends AbstractExecutionTest {
             row("Roman", 1)
         );
 
-        // For "single minus" operation, node should not request rows from the next source if result after the previous
-        // source is already empty.
+        // For single distribution set operations, node should not request rows from the next source if result after
+        // the previous source is already empty.
         ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, data);
         ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, data);
-        Node<Object[]> node3 = new AbstractNode<Object[]>(ctx, rowType) {
+        ScanNode<Object[]> scan3 = new ScanNode<>(ctx, rowType, Collections.emptyList());
+        Node<Object[]> node4 = new AbstractNode<Object[]>(ctx, rowType) {
             @Override protected void rewindInternal() {
                 // No-op.
             }
@@ -106,101 +109,71 @@ public class MinusExecutionTest extends AbstractExecutionTest {
             }
         };
 
-        MinusNode<Object[]> minusNode = new MinusNode<>(ctx, rowType, SINGLE, false, rowFactory());
-        minusNode.register(Arrays.asList(scan1, scan2, node3));
+        List<Node<Object[]>> inputs = Arrays.asList(scan1, scan2, scan3, node4);
+
+        AbstractSetOpNode<Object[]> setOpNode = setOpNodeFactory(ctx, rowType, SINGLE, false, inputs.size());
+        setOpNode.register(inputs);
 
         RootNode<Object[]> root = new RootNode<>(ctx, rowType);
-        root.register(minusNode);
+        root.register(setOpNode);
 
         assertFalse(root.hasNext());
     }
 
+    /** */
+    protected abstract void checkSetOp(boolean single, boolean all);
+
     /**
      * @param single Single.
      * @param all All.
      */
-    private void checkMinus(boolean single, boolean all) {
+    protected void checkSetOp(boolean single, boolean all, List<List<Object[]>> dataSets, List<Object[]> expectedResult) {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
 
-        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 1),
-            row("Roman", 1),
-            row("Igor", 1),
-            row("Roman", 2),
-            row("Igor", 1),
-            row("Igor", 1),
-            row("Igor", 2)
-        ));
+        List<Node<Object[]>> inputs = dataSets.stream().map(ds -> new ScanNode<>(ctx, rowType, ds))
+            .collect(Collectors.toList());
 
-        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
-            row("Igor", 1),
-            row("Roman", 1),
-            row("Igor", 1),
-            row("Alexey", 1)
-        ));
-
-        MinusNode<Object[]> minusNode;
-
-        if (single) {
-            minusNode = new MinusNode<>(
-                ctx,
-                rowType,
-                SINGLE,
-                all,
-                rowFactory()
-            );
-        }
-        else {
-            minusNode = new MinusNode<>(
-                ctx,
-                rowType,
-                MAP,
-                all,
-                rowFactory()
-            );
-        }
+        AbstractSetOpNode<Object[]> setOpNode;
+
+        if (single)
+            setOpNode = setOpNodeFactory(ctx, rowType, SINGLE, all, inputs.size());
+        else
+            setOpNode = setOpNodeFactory(ctx, rowType, MAP, all, inputs.size());
 
-        minusNode.register(Arrays.asList(scan1, scan2));
+        setOpNode.register(inputs);
 
         if (!single) {
-            MinusNode<Object[]> reduceNode = new MinusNode<>(
-                ctx,
-                rowType,
-                REDUCE,
-                all,
-                rowFactory()
-            );
+            AbstractSetOpNode<Object[]> reduceNode = setOpNodeFactory(ctx, rowType, REDUCE, all, 1);
 
-            reduceNode.register(Collections.singletonList(minusNode));
+            reduceNode.register(Collections.singletonList(setOpNode));
 
-            minusNode = reduceNode;
+            setOpNode = reduceNode;
         }
 
         Comparator<Object[]> cmp = ctx.expressionFactory().comparator(RelCollations.of(ImmutableIntList.of(0, 1)));
 
         // Create sort node on the top to check sorted results.
         SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, cmp);
-        sortNode.register(minusNode);
+        sortNode.register(setOpNode);
 
         RootNode<Object[]> root = new RootNode<>(ctx, rowType);
         root.register(sortNode);
 
-        assertTrue(root.hasNext());
+        assertTrue(F.isEmpty(expectedResult) || root.hasNext());
 
-        if (all) {
-            Assert.assertArrayEquals(row("Igor", 1), root.next());
-            Assert.assertArrayEquals(row("Igor", 1), root.next());
-        }
-
-        Assert.assertArrayEquals(row("Igor", 2), root.next());
-        Assert.assertArrayEquals(row("Roman", 2), root.next());
+        for (Object[] row : expectedResult)
+            Assert.assertArrayEquals(row, root.next());
 
         assertFalse(root.hasNext());
     }
 
     /** */
+    protected abstract AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx, RelDataType rowType,
+        AggregateType type, boolean all, int inputsCnt);
+
+    /** */
     protected RowHandler.RowFactory<Object[]> rowFactory() {
         return new RowHandler.RowFactory<Object[]>() {
             /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java
new file mode 100644
index 0000000..31e6169
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+
+/**
+ * Test execution of INTERSECT operator.
+ */
+public class IntersectExecutionTest extends AbstractSetOpExecutionTest {
+    /** {@inheritDoc} */
+    @Override protected AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx,
+        RelDataType rowType, AggregateType type, boolean all, int inputsCnt) {
+        return new IntersectNode<>(ctx, rowType, type, all, rowFactory(), inputsCnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkSetOp(boolean single, boolean all) {
+        List<Object[]> ds1 = Arrays.asList(
+            row("Igor", 1),
+            row("Roman", 1),
+            row("Igor", 1),
+            row("Roman", 2),
+            row("Igor", 1),
+            row("Igor", 1),
+            row("Igor", 2)
+        );
+
+        List<Object[]> ds2 = Arrays.asList(
+            row("Igor", 1),
+            row("Roman", 1),
+            row("Igor", 1),
+            row("Igor", 1),
+            row("Alexey", 1)
+        );
+
+        List<Object[]> ds3 = Arrays.asList(
+            row("Igor", 1),
+            row("Roman", 1),
+            row("Igor", 1),
+            row("Roman", 2),
+            row("Alexey", 2)
+        );
+
+        List<Object[]> expectedResult;
+
+        if (all) {
+            expectedResult = Arrays.asList(
+                row("Igor", 1),
+                row("Igor", 1),
+                row("Roman", 1)
+            );
+        }
+        else {
+            expectedResult = Arrays.asList(
+                row("Igor", 1),
+                row("Roman", 1)
+            );
+        }
+
+        checkSetOp(single, all, Arrays.asList(ds1, ds2, ds3), expectedResult);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
index bb7b32c..8708c98 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
@@ -18,205 +18,65 @@
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
-import java.util.UUID;
-import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
-import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
-import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 
 /**
  * Test execution of MINUS (EXCEPT) operator.
  */
-public class MinusExecutionTest extends AbstractExecutionTest {
-    /**
-     * @throws Exception If failed.
-     */
-    @Before
-    @Override public void setup() throws Exception {
-        nodesCnt = 1;
-        super.setup();
-    }
-
-    /** */
-    @Test
-    public void testSingle() {
-        checkMinus(true, false);
-    }
-
-    /** */
-    @Test
-    public void testSingleAll() {
-        checkMinus(true, true);
-    }
-
-    /** */
-    @Test
-    public void testMapReduce() {
-        checkMinus(false, false);
-    }
-
-    /** */
-    @Test
-    public void testMapReduceAll() {
-        checkMinus(false, true);
+public class MinusExecutionTest extends AbstractSetOpExecutionTest {
+    /** {@inheritDoc} */
+    @Override protected AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx,
+        RelDataType rowType, AggregateType type, boolean all, int inputsCnt) {
+        return new MinusNode<>(ctx, rowType, type, all, rowFactory());
     }
 
-    /** */
-    @Test
-    public void testSingleWithEmptySet() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-
-        List<Object[]> data = Arrays.asList(
-            row("Igor", 1),
-            row("Roman", 1)
-        );
-
-        // For "single minus" operation, node should not request rows from the next source if result after the previous
-        // source is already empty.
-        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, data);
-        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, data);
-        Node<Object[]> node3 = new AbstractNode<Object[]>(ctx, rowType) {
-            @Override protected void rewindInternal() {
-                // No-op.
-            }
-
-            @Override protected Downstream<Object[]> requestDownstream(int idx) {
-                return null;
-            }
-
-            @Override public void request(int rowsCnt) throws Exception {
-                fail("Node should not be requested");
-            }
-        };
-
-        MinusNode<Object[]> minusNode = new MinusNode<>(ctx, rowType, SINGLE, false, rowFactory());
-        minusNode.register(Arrays.asList(scan1, scan2, node3));
-
-        RootNode<Object[]> root = new RootNode<>(ctx, rowType);
-        root.register(minusNode);
-
-        assertFalse(root.hasNext());
-    }
-
-    /**
-     * @param single Single.
-     * @param all All.
-     */
-    private void checkMinus(boolean single, boolean all) {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-
-        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
+    /** {@inheritDoc} */
+    @Override protected void checkSetOp(boolean single, boolean all) {
+        List<Object[]> ds1 = Arrays.asList(
             row("Igor", 1),
             row("Roman", 1),
             row("Igor", 1),
             row("Roman", 2),
             row("Igor", 1),
             row("Igor", 1),
-            row("Igor", 2)
-        ));
+            row("Igor", 1),
+            row("Igor", 2),
+            row("Alexey", 2)
+        );
 
-        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
+        List<Object[]> ds2 = Arrays.asList(
             row("Igor", 1),
             row("Roman", 1),
             row("Igor", 1),
             row("Alexey", 1)
-        ));
+        );
+
+        List<Object[]> ds3 = Arrays.asList(
+            row("Igor", 1),
+            row("Alexey", 1),
+            row("Alexey", 2)
+        );
 
-        MinusNode<Object[]> minusNode;
+        List<Object[]> expectedResult;
 
-        if (single) {
-            minusNode = new MinusNode<>(
-                ctx,
-                rowType,
-                SINGLE,
-                all,
-                rowFactory()
+        if (all) {
+            expectedResult = Arrays.asList(
+                row("Igor", 1),
+                row("Igor", 1),
+                row("Igor", 2),
+                row("Roman", 2)
             );
         }
         else {
-            minusNode = new MinusNode<>(
-                ctx,
-                rowType,
-                MAP,
-                all,
-                rowFactory()
-            );
-        }
-
-        minusNode.register(Arrays.asList(scan1, scan2));
-
-        if (!single) {
-            MinusNode<Object[]> reduceNode = new MinusNode<>(
-                ctx,
-                rowType,
-                REDUCE,
-                all,
-                rowFactory()
+            expectedResult = Arrays.asList(
+                row("Igor", 2),
+                row("Roman", 2)
             );
-
-            reduceNode.register(Collections.singletonList(minusNode));
-
-            minusNode = reduceNode;
         }
 
-        Comparator<Object[]> cmp = ctx.expressionFactory().comparator(RelCollations.of(ImmutableIntList.of(0, 1)));
-
-        // Create sort node on the top to check sorted results.
-        SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, cmp);
-        sortNode.register(minusNode);
-
-        RootNode<Object[]> root = new RootNode<>(ctx, rowType);
-        root.register(sortNode);
-
-        assertTrue(root.hasNext());
-
-        if (all) {
-            Assert.assertArrayEquals(row("Igor", 1), root.next());
-            Assert.assertArrayEquals(row("Igor", 1), root.next());
-        }
-
-        Assert.assertArrayEquals(row("Igor", 2), root.next());
-        Assert.assertArrayEquals(row("Roman", 2), root.next());
-
-        assertFalse(root.hasNext());
-    }
-
-    /** */
-    protected RowHandler.RowFactory<Object[]> rowFactory() {
-        return new RowHandler.RowFactory<Object[]>() {
-            /** */
-            @Override public RowHandler<Object[]> handler() {
-                return ArrayRowHandler.INSTANCE;
-            }
-
-            /** */
-            @Override public Object[] create() {
-                throw new AssertionError();
-            }
-
-            /** */
-            @Override public Object[] create(Object... fields) {
-                return fields;
-            }
-        };
+        checkSetOp(single, all, Arrays.asList(ds1, ds2, ds3), expectedResult);
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 94fe7a7..97a536e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -38,7 +38,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 @WithSystemProperty(key = "calcite.debug", value = "false")
 public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
     /** */
-    private static IgniteEx client;
+    protected static IgniteEx client;
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
new file mode 100644
index 0000000..07a7f55
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Integration test for set op (EXCEPT, INTERSECT).
+ */
+public class SetOpIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private void populateTables() throws InterruptedException {
+        IgniteCache<Integer, Employer> emp1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp1")
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp1")))
+            .setBackups(2)
+        );
+
+        IgniteCache<Integer, Employer> emp2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp2")
+            .setSqlSchema("PUBLIC")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp2")))
+            .setBackups(1)
+        );
+
+        emp1.put(1, new Employer("Igor", 10d));
+        emp1.put(2, new Employer("Igor", 11d));
+        emp1.put(3, new Employer("Igor", 12d));
+        emp1.put(4, new Employer("Igor1", 13d));
+        emp1.put(5, new Employer("Igor1", 13d));
+        emp1.put(6, new Employer("Igor1", 13d));
+        emp1.put(7, new Employer("Roman", 14d));
+
+        emp2.put(1, new Employer("Roman", 10d));
+        emp2.put(2, new Employer("Roman", 11d));
+        emp2.put(3, new Employer("Roman", 12d));
+        emp2.put(4, new Employer("Roman", 13d));
+        emp2.put(5, new Employer("Igor1", 13d));
+        emp2.put(6, new Employer("Igor1", 13d));
+
+        awaitPartitionMapExchange(true, true, null);
+    }
+
+    /** Copy cache with it's content to new replicated cache. */
+    private void copyCacheAsReplicated(String cacheName) throws InterruptedException {
+        IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<Object, Object>(
+            cache.getConfiguration(CacheConfiguration.class));
+
+        ccfg.setName(cacheName + "Replicated");
+        ccfg.setCacheMode(CacheMode.REPLICATED);
+        ccfg.getQueryEntities().forEach(qe -> qe.setTableName(qe.getTableName() + "_repl"));
+
+        IgniteCache<Object, Object> replCache = client.getOrCreateCache(ccfg);
+
+        for (Cache.Entry<?, ?> entry : cache)
+            replCache.put(entry.getKey(), entry.getValue());
+
+        awaitPartitionMapExchange(true, true, null);
+    }
+
+    /** */
+    @Test
+    public void testExcept() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name FROM emp1 EXCEPT SELECT name FROM emp2");
+
+        assertEquals(1, rows.size());
+        assertEquals("Igor", rows.get(0).get(0));
+    }
+
+    /** */
+    @Test
+    public void testExceptFromEmpty() throws Exception {
+        populateTables();
+
+        copyCacheAsReplicated("emp1");
+        copyCacheAsReplicated("emp2");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1 WHERE salary < 0 EXCEPT SELECT name FROM emp2");
+
+        assertEquals(0, rows.size());
+
+        rows = sql("SELECT name FROM emp1_repl WHERE salary < 0 EXCEPT SELECT name FROM emp2_repl");
+
+        assertEquals(0, rows.size());
+    }
+
+    /** */
+    @Test
+    public void testExceptSeveralColumns() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name, salary FROM emp1 EXCEPT SELECT name, salary FROM emp2");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+    }
+
+    /** */
+    @Test
+    public void testExceptAll() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name FROM emp1 EXCEPT ALL SELECT name FROM emp2");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptNested() throws Exception {
+        populateTables();
+
+        List<List<?>> rows =
+            sql("SELECT name FROM emp1 EXCEPT (SELECT name FROM emp1 EXCEPT SELECT name FROM emp2)");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptReplicatedWithPartitioned() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("emp1");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1_repl EXCEPT ALL SELECT name FROM emp2");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptReplicated() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("emp1");
+        copyCacheAsReplicated("emp2");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1_repl EXCEPT ALL SELECT name FROM emp2_repl");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptMerge() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("emp1");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1_repl EXCEPT ALL SELECT name FROM emp2 EXCEPT ALL " +
+            "SELECT name FROM emp1 WHERE salary < 11");
+
+        assertEquals(3, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testSetOpBigBatch() throws Exception {
+        client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+            .setName("cache1")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table1")))
+            .setBackups(2)
+        );
+
+        client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+            .setName("cache2")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table2")))
+            .setBackups(1)
+        );
+
+        copyCacheAsReplicated("cache1");
+        copyCacheAsReplicated("cache2");
+
+        try (IgniteDataStreamer<Integer, Integer> ds1 = client.dataStreamer("cache1");
+             IgniteDataStreamer<Integer, Integer> ds2 = client.dataStreamer("cache2");
+             IgniteDataStreamer<Integer, Integer> ds3 = client.dataStreamer("cache1Replicated");
+             IgniteDataStreamer<Integer, Integer> ds4 = client.dataStreamer("cache2Replicated")
+        ) {
+            int key = 0;
+
+            for (int i = 0; i < 5; i++) {
+                for (int j = 0; j < ((i == 0) ? 1 : (1 << (i * 4 - 1))); j++) {
+                    // Cache1 keys count: 1 of "0", 8 of "1", 128 of "2", 2048 of "3", 32768 of "4".
+                    ds1.addData(key++, i);
+                    ds3.addData(key++, i);
+
+                    // Cache2 keys count: 1 of "5", 128 of "3", 32768 of "1".
+                    if ((i & 1) == 0) {
+                        ds2.addData(key++, 5 - i);
+                        ds4.addData(key++, 5 - i);
+                    }
+                }
+            }
+        }
+
+        awaitPartitionMapExchange(true, true, null);
+
+        List<List<?>> rows;
+
+        // Check 2 partitioned caches.
+        rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(3, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(34817, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1\".table1 INTERSECT SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+        rows = sql("SELECT _val FROM \"cache1\".table1 INTERSECT ALL SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(136, rows.size());
+        assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+
+        // Check 1 replicated and 1 partitioned caches.
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(3, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(34817, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT ALL SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(136, rows.size());
+        assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+
+        // Check 2 replicated caches.
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(3, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(34817, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT ALL SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(136, rows.size());
+        assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+    }
+
+    /** */
+    @Test
+    public void testIntersect() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name FROM emp1 INTERSECT SELECT name FROM emp2");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+    }
+
+    /** */
+    @Test
+    public void testInstersectAll() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name FROM emp1 INTERSECT ALL SELECT name FROM emp2");
+
+        assertEquals(3, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+    }
+
+    /** */
+    @Test
+    public void testIntersectEmpty() throws Exception {
+        populateTables();
+
+        copyCacheAsReplicated("emp1");
+        copyCacheAsReplicated("emp2");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1 WHERE salary < 0 INTERSECT SELECT name FROM emp2");
+
+        assertEquals(0, rows.size());
+
+        rows = sql("SELECT name FROM emp1_repl WHERE salary < 0 INTERSECT SELECT name FROM emp2_repl");
+
+        assertEquals(0, rows.size());
+    }
+
+    /** */
+    @Test
+    public void testIntersectMerge() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("emp1");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1_repl INTERSECT ALL SELECT name FROM emp2 INTERSECT ALL " +
+            "SELECT name FROM emp1 WHERE salary < 14");
+
+        assertEquals(2, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testIntersectReplicated() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("emp1");
+        copyCacheAsReplicated("emp2");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1_repl INTERSECT ALL SELECT name FROM emp2_repl");
+
+        assertEquals(3, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+    }
+
+    /** */
+    @Test
+    public void testIntersectReplicatedWithPartitioned() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("emp1");
+
+        List<List<?>> rows = sql("SELECT name FROM emp1_repl INTERSECT ALL SELECT name FROM emp2");
+
+        assertEquals(3, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+    }
+
+    /** */
+    @Test
+    public void testIntersectSeveralColumns() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name, salary FROM emp1 INTERSECT ALL SELECT name, salary FROM emp2");
+
+        assertEquals(2, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    private List<List<?>> sql(String sql) throws IgniteInterruptedCheckedException {
+        QueryEngine engine = Commons.lookupComponent(client.context(), QueryEngine.class);
+
+        return engine.query(null, "PUBLIC", sql).get(0).getAll();
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 91256552..4779a20 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -475,6 +475,8 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
         String... disabledRules) throws Exception {
         IgniteRel plan = physicalPlan(sql, schema, disabledRules);
 
+        checkSplitAndSerialization(plan, schema);
+
         if (!predicate.test((T)plan)) {
             String invalidPlanMsg = "Invalid plan (" + lastErrorMsg + "):\n" +
                 RelOptUtil.toString(plan, SqlExplainLevel.ALL_ATTRIBUTES);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SetOpPlannerTest.java
similarity index 52%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SetOpPlannerTest.java
index 1b04e91..e307c5f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SetOpPlannerTest.java
@@ -20,28 +20,47 @@ package org.apache.ignite.internal.processors.query.calcite.planner;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapIntersect;
 import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceIntersect;
 import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleIntersect;
 import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleSetOp;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
- * Test to verify EXCEPT operator.
+ * Test to verify set op (EXCEPT, INTERSECT).
  */
-public class ExceptPlannerTest extends AbstractPlannerTest {
+@RunWith(Parameterized.class)
+public class SetOpPlannerTest extends AbstractPlannerTest {
+    /** Algorithm. */
+    @Parameterized.Parameter
+    public SetOp setOp;
+
+    /** */
+    @Parameterized.Parameters(name = "SetOp = {0}")
+    public static List<Object[]> parameters() {
+        return Stream.of(SetOp.values()).map(a -> new Object[]{a}).collect(Collectors.toList());
+    }
+
     /** Public schema. */
     private IgniteSchema publicSchema;
 
-    /** Last error. */
-    private String lastError;
-
     /** {@inheritDoc} */
     @Before
     @Override public void setup() {
@@ -83,14 +102,14 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptRandom() throws Exception {
-        String sql = "" +
+    public void testSetOpRandom() throws Exception {
+        String sql =
             "SELECT * FROM random_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM random_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
-            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(n -> !n.all())
+            .and(hasChildThat(isInstanceOf(setOp.map)
                 .and(input(0, isTableScan("random_tbl1")))
                 .and(input(1, isTableScan("random_tbl2")))
             ))
@@ -101,14 +120,14 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptAllRandom() throws Exception {
-        String sql = "" +
+    public void testSetOpAllRandom() throws Exception {
+        String sql =
             "SELECT * FROM random_tbl1 " +
-            "EXCEPT ALL " +
+            setOpAll() +
             "SELECT * FROM random_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
-            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(IgniteSetOp::all)
+            .and(hasChildThat(isInstanceOf(setOp.map)
                 .and(input(0, isTableScan("random_tbl1")))
                 .and(input(1, isTableScan("random_tbl2")))
             ))
@@ -119,13 +138,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptBroadcast() throws Exception {
-        String sql = "" +
+    public void testSetOpBroadcast() throws Exception {
+        String sql =
             "SELECT * FROM broadcast_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM broadcast_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
             .and(input(0, isTableScan("broadcast_tbl1")))
             .and(input(1, isTableScan("broadcast_tbl2")))
         );
@@ -135,13 +154,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptSingle() throws Exception {
-        String sql = "" +
+    public void testSetOpSingle() throws Exception {
+        String sql =
             "SELECT * FROM single_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM single_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
             .and(input(0, isTableScan("single_tbl1")))
             .and(input(1, isTableScan("single_tbl2"))));
     }
@@ -150,13 +169,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptSingleAndRandom() throws Exception {
-        String sql = "" +
+    public void testSetOpSingleAndRandom() throws Exception {
+        String sql =
             "SELECT * FROM single_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM random_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
             .and(input(0, isTableScan("single_tbl1")))
             .and(input(1, hasChildThat(isTableScan("random_tbl1")))));
     }
@@ -165,13 +184,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptSingleAndAffinity() throws Exception {
-        String sql = "" +
+    public void testSetOpSingleAndAffinity() throws Exception {
+        String sql =
             "SELECT * FROM single_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM affinity_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
             .and(input(0, isTableScan("single_tbl1")))
             .and(input(1, hasChildThat(isTableScan("affinity_tbl1")))));
     }
@@ -180,13 +199,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptSingleAndBroadcast() throws Exception {
-        String sql = "" +
+    public void testSetOpSingleAndBroadcast() throws Exception {
+        String sql =
             "SELECT * FROM single_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM broadcast_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
             .and(input(0, isTableScan("single_tbl1")))
             .and(input(1, isTableScan("broadcast_tbl1")))
         );
@@ -196,14 +215,14 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptAffinity() throws Exception {
-        String sql = "" +
+    public void testSetOpAffinity() throws Exception {
+        String sql =
             "SELECT * FROM affinity_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM affinity_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class)
-            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+            .and(hasChildThat(isInstanceOf(setOp.map)
                 .and(input(0, isTableScan("affinity_tbl1")))
                 .and(input(1, isTableScan("affinity_tbl2")))
             ))
@@ -214,13 +233,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptBroadcastAndRandom() throws Exception {
-        String sql = "" +
+    public void testSetOpBroadcastAndRandom() throws Exception {
+        String sql =
             "SELECT * FROM random_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM broadcast_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
             .and(input(0, hasChildThat(isTableScan("random_tbl1"))))
             .and(input(1, isTableScan("broadcast_tbl1")))
         );
@@ -230,64 +249,88 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptRandomNested() throws Exception {
-        String sql = "" +
-            "SELECT * FROM random_tbl2 EXCEPT (" +
+    public void testSetOpRandomNested() throws Exception {
+        String sql =
+            "SELECT * FROM random_tbl2 " + setOp() + "(" +
             "   SELECT * FROM random_tbl1 " +
-            "   EXCEPT " +
+            setOp() +
             "   SELECT * FROM random_tbl2" +
             ")";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
-            .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
-            .and(input(1, isInstanceOf(IgniteReduceMinus.class)
-                .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
-                    .and(input(0, isTableScan("random_tbl1")))
-                    .and(input(1, isTableScan("random_tbl2")))
+        if (setOp == SetOp.EXCEPT) {
+            assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+                .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
+                .and(input(1, isInstanceOf(setOp.reduce)
+                    .and(hasChildThat(isInstanceOf(setOp.map)
+                        .and(input(0, isTableScan("random_tbl1")))
+                        .and(input(1, isTableScan("random_tbl2")))
+                    ))
                 ))
-            ))
-        );
+            );
+        }
+        else {
+            // INTERSECT operator is commutative and can be merged.
+            assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+                .and(hasChildThat(isInstanceOf(setOp.map)
+                    .and(input(0, isTableScan("random_tbl2")))
+                    .and(input(1, isTableScan("random_tbl1")))
+                    .and(input(2, isTableScan("random_tbl2")))
+                ))
+            );
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptBroadcastAndRandomNested() throws Exception {
-        String sql = "" +
-            "SELECT * FROM broadcast_tbl1 EXCEPT (" +
+    public void testSetOpBroadcastAndRandomNested() throws Exception {
+        String sql =
+            "SELECT * FROM broadcast_tbl1 " + setOp() + "(" +
             "   SELECT * FROM random_tbl1 " +
-            "   EXCEPT " +
+            setOp() +
             "   SELECT * FROM random_tbl2" +
             ")";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
-            .and(input(0, isTableScan("broadcast_tbl1")))
-            .and(input(1, isInstanceOf(IgniteReduceMinus.class)
-                .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
-                    .and(input(0, isTableScan("random_tbl1")))
-                    .and(input(1, isTableScan("random_tbl2")))
+        if (setOp == SetOp.EXCEPT) {
+            assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+                .and(input(0, isTableScan("broadcast_tbl1")))
+                .and(input(1, isInstanceOf(setOp.reduce)
+                    .and(hasChildThat(isInstanceOf(setOp.map)
+                        .and(input(0, isTableScan("random_tbl1")))
+                        .and(input(1, isTableScan("random_tbl2")))
+                    ))
                 ))
-            ))
-        );
+            );
+        }
+        else {
+            // INTERSECT operator is commutative and can be merged.
+            assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+                .and(hasChildThat(isInstanceOf(setOp.map)
+                    .and(input(0, nodeOrAnyChild(isTableScan("broadcast_tbl1"))))
+                    .and(input(1, isTableScan("random_tbl1")))
+                    .and(input(2, isTableScan("random_tbl2")))
+                ))
+            );
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptMerge() throws Exception {
-        String sql = "" +
+    public void testSetOpMerge() throws Exception {
+        String sql =
             "SELECT * FROM random_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM random_tbl2 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM affinity_tbl1 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM affinity_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class)
-            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+            .and(hasChildThat(isInstanceOf(setOp.map)
                 .and(input(0, isTableScan("random_tbl1")))
                 .and(input(1, isTableScan("random_tbl2")))
                 .and(input(2, isTableScan("affinity_tbl1")))
@@ -300,18 +343,18 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptAllMerge() throws Exception {
-        String sql = "" +
+    public void testSetOpAllMerge() throws Exception {
+        String sql =
             "SELECT * FROM random_tbl1 " +
-            "EXCEPT ALL " +
+            setOpAll() +
             "SELECT * FROM random_tbl2 " +
-            "EXCEPT ALL " +
+            setOpAll() +
             "SELECT * FROM affinity_tbl1 " +
-            "EXCEPT ALL " +
+            setOpAll() +
             "SELECT * FROM affinity_tbl2 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
-            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(IgniteSetOp::all)
+            .and(hasChildThat(isInstanceOf(setOp.map)
                 .and(input(0, isTableScan("random_tbl1")))
                 .and(input(1, isTableScan("random_tbl2")))
                 .and(input(2, isTableScan("affinity_tbl1")))
@@ -324,20 +367,66 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testExceptAllWithExceptMerge() throws Exception {
-        String sql = "" +
+    public void testSetOpAllWithExceptMerge() throws Exception {
+        String sql =
             "SELECT * FROM random_tbl1 " +
-            "EXCEPT ALL " +
+            setOpAll() +
             "SELECT * FROM random_tbl2 " +
-            "EXCEPT " +
+            setOp() +
             "SELECT * FROM affinity_tbl1 ";
 
-        assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
-            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+        assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(n -> !n.all())
+            .and(hasChildThat(isInstanceOf(setOp.map)
                 .and(input(0, isTableScan("random_tbl1")))
                 .and(input(1, isTableScan("random_tbl2")))
                 .and(input(2, isTableScan("affinity_tbl1")))
             ))
         );
     }
+
+    /** */
+    private String setOp() {
+        return setOp.name() + ' ';
+    }
+
+    /** */
+    private String setOpAll() {
+        return setOp.name() + " ALL ";
+    }
+
+    /** */
+    enum SetOp {
+        /** */
+        EXCEPT(
+            IgniteSingleMinus.class,
+            IgniteMapMinus.class,
+            IgniteReduceMinus.class
+        ),
+
+        /** */
+        INTERSECT(
+            IgniteSingleIntersect.class,
+            IgniteMapIntersect.class,
+            IgniteReduceIntersect.class
+        );
+
+        /** */
+        public final Class<? extends IgniteSingleSetOp> single;
+
+        /** */
+        public final Class<? extends IgniteMapSetOp> map;
+
+        /** */
+        public final Class<? extends IgniteReduceSetOp> reduce;
+
+        /** */
+        SetOp(
+            Class<? extends IgniteSingleSetOp> single,
+            Class<? extends IgniteMapSetOp> map,
+            Class<? extends IgniteReduceSetOp> reduce) {
+            this.single = single;
+            this.map = map;
+            this.reduce = reduce;
+        }
+    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionPlannerTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionPlannerTest.java
index fdd026d..227a95b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionPlannerTest.java
@@ -33,7 +33,7 @@ import org.junit.Test;
 /**
  * Test table functions.
  */
-public class TableFunctionTest extends AbstractPlannerTest {
+public class TableFunctionPlannerTest extends AbstractPlannerTest {
     /** Public schema. */
     private IgniteSchema publicSchema;
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 877228e..3b4ad0b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTes
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashIndexSpoolExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
@@ -48,6 +49,7 @@ import org.junit.runners.Suite;
     HashAggregateSingleGroupExecutionTest.class,
     SortAggregateExecutionTest.class,
     MinusExecutionTest.class,
+    IntersectExecutionTest.class,
     RuntimeTreeIndexTest.class,
 })
 public class ExecutionTestSuite {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index fd9e0a1..a22299c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.Aggregate
 import org.apache.ignite.internal.processors.query.calcite.integration.CalciteErrorHandlilngIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.SetOpIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.SortAggregateIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
@@ -61,6 +62,7 @@ import org.junit.runners.Suite;
     TableDmlIntegrationTest.class,
     DataTypesTest.class,
     IndexSpoolIntegrationTest.class,
+    SetOpIntegrationTest.class,
 })
 public class IntegrationTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 47136f3..14f3263 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -20,15 +20,15 @@ package org.apache.ignite.testsuites;
 import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.SetOpPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.TableDmlPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.TableFunctionTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.TableFunctionPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -48,8 +48,8 @@ import org.junit.runners.Suite;
     HashAggregatePlannerTest.class,
     SortAggregatePlannerTest.class,
     JoinColocationPlannerTest.class,
-    ExceptPlannerTest.class,
-    TableFunctionTest.class,
+    SetOpPlannerTest.class,
+    TableFunctionPlannerTest.class,
     TableDmlPlannerTest.class,
 })
 public class PlannerTestSuite {