You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/04/22 14:05:22 UTC

[ignite] branch sql-calcite updated: IGNITE-13691 Support of EXCEPT operator - Fixes #9009.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new a651823  IGNITE-13691 Support of EXCEPT operator - Fixes #9009.
a651823 is described below

commit a65182349eb8f2820225bf356020d487bd4b8eaf
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Apr 22 16:39:04 2021 +0300

    IGNITE-13691 Support of EXCEPT operator - Fixes #9009.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/exec/ExecutionServiceImpl.java   |   1 +
 .../query/calcite/exec/LogicalRelImplementor.java  |  35 ++
 .../query/calcite/exec/exp/agg/GroupKey.java       |   5 +
 .../query/calcite/exec/rel/MinusNode.java          | 387 +++++++++++++++++
 .../query/calcite/exec/rel/RootNode.java           |   4 +-
 .../query/calcite/externalize/RelJson.java         |   1 +
 .../query/calcite/metadata/IgniteMdRowCount.java   |   8 +
 .../processors/query/calcite/prepare/Cloner.java   |  20 +
 .../query/calcite/prepare/IgniteRelShuttle.java    |  18 +
 .../query/calcite/prepare/PlannerPhase.java        |   4 +
 .../query/calcite/rel/IgniteRelVisitor.java        |  18 +
 .../query/calcite/rel/set/IgniteMapMinus.java      | 141 +++++++
 .../query/calcite/rel/set/IgniteMinusBase.java     | 100 +++++
 .../query/calcite/rel/set/IgniteReduceMinus.java   | 135 ++++++
 .../query/calcite/rel/set/IgniteSingleMinus.java   | 134 ++++++
 .../query/calcite/rule/MinusConverterRule.java     |  94 +++++
 .../query/calcite/CalciteQueryProcessorTest.java   | 240 ++++++++++-
 .../query/calcite/exec/rel/MinusExecutionTest.java | 222 ++++++++++
 .../query/calcite/planner/ExceptPlannerTest.java   | 468 +++++++++++++++++++++
 .../ignite/testsuites/ExecutionTestSuite.java      |   2 +
 .../apache/ignite/testsuites/PlannerTestSuite.java |   2 +
 21 files changed, 2028 insertions(+), 11 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index b3b0b6c..ab3bb4e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -563,6 +563,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
             case WITH:
             case VALUES:
             case UNION:
+            case EXCEPT:
                 return prepareQuery(sqlNode, ctx);
 
             case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index f9cfcf5..873a2f0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ModifyNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
@@ -83,6 +84,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
@@ -424,6 +429,36 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteSingleMinus rel) {
+        return visit(rel, AggregateType.SINGLE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteMapMinus rel) {
+        return visit(rel, AggregateType.MAP);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteReduceMinus rel) {
+        return visit(rel, AggregateType.REDUCE);
+    }
+
+    /** Visit IgniteMinus rel. */
+    private Node<Row> visit(IgniteMinusBase rel, AggregateType aggType) {
+        RelDataType rowType = rel.getRowType();
+
+        RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+        List<Node<Row>> inputs = Commons.transform(rel.getInputs(), this::visit);
+
+        MinusNode<Row> node = new MinusNode<>(ctx, rowType, aggType, rel.all, rowFactory);
+
+        node.register(inputs);
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteTableModify rel) {
         switch (rel.getOperation()) {
             case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
index 2796a14..efdf857 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
@@ -40,6 +40,11 @@ public class GroupKey {
         return fields[idx];
     }
 
+    /** */
+    public int fieldsCount() {
+        return fields.length;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
new file mode 100644
index 0000000..4859119
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Execution node for MINUS (EXCEPT) operator.
+ */
+public class MinusNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final boolean all;
+
+    /** */
+    private final RowFactory<Row> rowFactory;
+
+    /** */
+    private final Grouping grouping;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** Current source index. */
+    private int curSrcIdx;
+
+    /** */
+    private boolean inLoop;
+
+    /**
+     * @param ctx Execution context.
+     */
+    public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory) {
+        super(ctx, rowType);
+
+        this.all = all;
+        this.type = type;
+        this.rowFactory = rowFactory;
+
+        grouping = new Grouping();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources());
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        else if (!inLoop)
+            context().execute(this::flush, this::onError);
+    }
+
+    /** */
+    public void push(Row row, int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        grouping.add(row, idx);
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    public void end(int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+        assert curSrcIdx == idx;
+
+        checkState();
+
+        if (type == AggregateType.SINGLE && grouping.isEmpty())
+            curSrcIdx = sources().size(); // Skip subsequent sources.
+        else
+            curSrcIdx++;
+
+        if (curSrcIdx >= sources().size()) {
+            waiting = -1;
+
+            flush();
+        }
+        else
+            sources().get(curSrcIdx).request(waiting);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        grouping.groups.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        return new Downstream<Row>() {
+            @Override public void push(Row row) throws Exception {
+                MinusNode.this.push(row, idx);
+            }
+
+            @Override public void end() throws Exception {
+                MinusNode.this.end(idx);
+            }
+
+            @Override public void onError(Throwable e) {
+                MinusNode.this.onError(e);
+            }
+        };
+    }
+
+    /** */
+    private void flush() throws Exception {
+        if (isClosed())
+            return;
+
+        checkState();
+
+        assert waiting == -1;
+
+        int processed = 0;
+
+        inLoop = true;
+
+        try {
+            while (requested > 0 && !grouping.isEmpty()) {
+                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
+
+                for (Row row : grouping.getRows(toSnd)) {
+                    checkState();
+
+                    requested--;
+                    downstream().push(row);
+
+                    processed++;
+                }
+
+                if (processed >= IN_BUFFER_SIZE && requested > 0) {
+                    // Allow others to do their job.
+                    context().execute(this::flush, this::onError);
+
+                    return;
+                }
+            }
+        }
+        finally {
+            inLoop = false;
+        }
+
+        if (requested > 0) {
+            requested = 0;
+
+            downstream().end();
+        }
+    }
+
+    /** */
+    private class Grouping {
+        /**
+         * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
+         * keys in all sets except first.
+         */
+        private final Map<GroupKey, int[]> groups = new HashMap<>();
+
+        /** */
+        private final RowHandler<Row> hnd;
+
+        /** */
+        private Grouping() {
+            hnd = context().rowHandler();
+        }
+
+        /** */
+        private void add(Row row, int setIdx) {
+            if (type == AggregateType.REDUCE) {
+                assert setIdx == 0 : "Unexpected set index: " + setIdx;
+
+                addOnReducer(row);
+            }
+            else if (type == AggregateType.MAP)
+                addOnMapper(row, setIdx);
+            else
+                addOnSingle(row, setIdx);
+        }
+
+        /**
+         * @param cnt Number of rows.
+         *
+         * @return Actually sent rows number.
+         */
+        private List<Row> getRows(int cnt) {
+            if (F.isEmpty(groups))
+                return Collections.emptyList();
+            else if (type == AggregateType.MAP)
+                return getOnMapper(cnt);
+            else
+                return getOnSingleOrReducer(cnt);
+        }
+
+        /** */
+        private GroupKey key(Row row) {
+            int size = hnd.columnCount(row);
+
+            Object[] fields = new Object[size];
+
+            for (int i = 0; i < size; i++)
+                fields[i] = hnd.get(i, row);
+
+            return new GroupKey(fields);
+        }
+
+        /** */
+        private void addOnSingle(Row row, int setIdx) {
+            int[] cntrs;
+
+            GroupKey key = key(row);
+
+            if (setIdx == 0) {
+                cntrs = groups.computeIfAbsent(key, k -> new int[2]);
+
+                cntrs[0]++;
+            }
+            else {
+                cntrs = groups.get(key);
+
+                if (cntrs != null) {
+                    cntrs[1]++;
+
+                    if (cntrs[1] >= cntrs[0])
+                        groups.remove(key);
+                }
+            }
+        }
+
+        /** */
+        private void addOnMapper(Row row, int setIdx) {
+            int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[2]);
+
+            cntrs[setIdx == 0 ? 0 : 1]++;
+        }
+
+        /** */
+        private void addOnReducer(Row row) {
+            GroupKey grpKey = (GroupKey)hnd.get(0, row);
+
+            int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[2]);
+
+            int[] cntrsMap = (int[])hnd.get(1, row);
+
+            for (int i = 0; i < cntrsMap.length; i++)
+                cntrs[i] += cntrsMap[i];
+        }
+
+        /** */
+        private List<Row> getOnMapper(int cnt) {
+            Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
+
+            int amount = Math.min(cnt, groups.size());
+            List<Row> res = new ArrayList<>(amount);
+
+            while (amount > 0 && it.hasNext()) {
+                Map.Entry<GroupKey, int[]> entry = it.next();
+
+                // Skip row if it doesn't affect the final result.
+                if (entry.getValue()[0] != entry.getValue()[1]) {
+                    res.add(rowFactory.create(entry.getKey(), entry.getValue()));
+
+                    amount--;
+                }
+
+                it.remove();
+            }
+
+            return res;
+        }
+
+        /** */
+        private List<Row> getOnSingleOrReducer(int cnt) {
+            Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
+
+            List<Row> res = new ArrayList<>(cnt);
+
+            while (it.hasNext() && cnt > 0) {
+                Map.Entry<GroupKey, int[]> entry = it.next();
+
+                GroupKey key = entry.getKey();
+
+                Object[] fields = new Object[key.fieldsCount()];
+
+                for (int i = 0; i < fields.length; i++)
+                    fields[i] = key.field(i);
+
+                Row row = rowFactory.create(fields);
+
+                int[] cntrs = entry.getValue();
+
+                int availableRows = availableRows(entry.getValue());
+
+                if (availableRows <= cnt){
+                    it.remove();
+
+                    if (availableRows == 0)
+                        continue;
+
+                    cnt -= availableRows;
+                }
+                else {
+                    availableRows = cnt;
+
+                    decrementAvailableRows(cntrs, availableRows);
+
+                    cnt = 0;
+                }
+
+                for (int i = 0; i < availableRows; i++)
+                    res.add(row);
+            }
+
+            return res;
+        }
+
+        /** */
+        private int availableRows(int[] cntrs) {
+            assert cntrs.length == 2;
+
+            if (all)
+                return Math.max(cntrs[0] - cntrs[1], 0);
+            else
+                return cntrs[1] == 0 ? 1 : 0;
+        }
+
+        /** */
+        private void decrementAvailableRows(int[] cntrs, int amount) {
+            assert amount > 0;
+            assert all;
+
+            cntrs[0] -= amount;
+        }
+
+
+        /** */
+        private boolean isEmpty() {
+            return groups.isEmpty();
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 9bdda10..a4ce9ee 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -140,10 +140,10 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
 
     /** {@inheritDoc} */
     @Override public void push(Row row) throws Exception {
-        assert waiting > 0;
-
         lock.lock();
         try {
+            assert waiting > 0;
+
             checkState();
 
             waiting--;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index d454b9a..cf024a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -218,6 +218,7 @@ class RelJson {
         ImmutableList.of(
             "org.apache.ignite.internal.processors.query.calcite.rel.",
             "org.apache.ignite.internal.processors.query.calcite.rel.agg.",
+            "org.apache.ignite.internal.processors.query.calcite.rel.set.",
             "org.apache.calcite.rel.",
             "org.apache.calcite.rel.core.",
             "org.apache.calcite.rel.logical.",
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 8be9f1a..4b01441 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -31,6 +31,7 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
@@ -114,4 +115,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
     public double getRowCount(IgniteSortedIndexSpool rel, RelMetadataQuery mq) {
         return rel.estimateRowCount(mq);
     }
+
+    /**
+     * Estimation of row count for MINUS (EXCEPT) operator.
+     */
+    public double getRowCount(IgniteMinusBase rel, RelMetadataQuery mq) {
+        return rel.estimateRowCount(mq);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 5ef854a..2a8b410 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -46,6 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -215,15 +218,32 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteHashIndexSpool rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSingleMinus rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapMinus rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReduceMinus rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteRel rel) {
         return rel.accept(this);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index a069f19..6397b30 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -46,6 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /** */
@@ -180,6 +183,21 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
         return rel.accept(this);
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSingleMinus rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapMinus rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReduceMinus rel) {
+        return processNode(rel);
+    }
+
     /**
      * Visits all children of a parent.
      */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 3dced8d..ccfce87 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMerge
 import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.MinusConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
@@ -137,6 +138,7 @@ public enum PlannerPhase {
                                 .anyInputs()).toRule(),
 
                     CoreRules.UNION_MERGE,
+                    CoreRules.MINUS_MERGE,
                     CoreRules.UNION_REMOVE,
                     CoreRules.JOIN_COMMUTE,
                     CoreRules.AGGREGATE_REMOVE,
@@ -168,6 +170,8 @@ public enum PlannerPhase {
                     HashAggregateConverterRule.MAP_REDUCE,
                     SortAggregateConverterRule.SINGLE,
                     SortAggregateConverterRule.MAP_REDUCE,
+                    MinusConverterRule.SINGLE,
+                    MinusConverterRule.MAP_REDUCE,
                     MergeJoinConverterRule.INSTANCE,
                     NestedLoopJoinConverterRule.INSTANCE,
                     ProjectConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 072b587e..cab5c5f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -23,6 +23,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
 
 /**
  * A visitor to traverse an Ignite relational nodes tree.
@@ -154,6 +157,21 @@ public interface IgniteRelVisitor<T> {
     T visit(IgniteHashIndexSpool rel);
 
     /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteSingleMinus rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteMapMinus rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
+    T visit(IgniteReduceMinus rel);
+
+    /**
      * Visits a relational node and calculates a result on the basis of node meta information.
      * @param rel Relational node.
      * @return Visit result.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
new file mode 100644
index 0000000..061cc1c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of MINUS (EXCEPT) operator.
+ */
+public class IgniteMapMinus extends IgniteMinusBase {
+    /** */
+    public IgniteMapMinus(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        List<RelNode> inputs,
+        boolean all
+    ) {
+        super(cluster, traitSet, inputs, all);
+    }
+
+    /** */
+    public IgniteMapMinus(RelInput input) {
+        super(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected RelDataType deriveRowType() {
+        RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
+
+        assert typeFactory instanceof IgniteTypeFactory;
+
+        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+        builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
+        builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
+
+        return builder.build();
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.
+
+        return ImmutableList.of(
+            Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
+                t -> t.replace(IgniteDistributions.random())))
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        Set<CorrelationId> correlationIds = inTraits.stream()
+            .map(TraitUtils::correlation)
+            .flatMap(corrTr -> corrTr.correlationIds().stream())
+            .collect(Collectors.toSet());
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
+            inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int aggregateFieldsCount() {
+        return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
new file mode 100644
index 0000000..b2cf77b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Base class for physical MINUS (EXCEPT) set op.
+ */
+public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
+    /** Count of counter fields used to aggregate results. */
+    protected static final int COUNTER_FIELDS_CNT = 2;
+
+    /** */
+    IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+        super(cluster, traits, inputs, all);
+    }
+
+    /** {@inheritDoc} */
+    protected IgniteMinusBase(RelInput input) {
+        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        // Operation erases collation.
+        return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+        // Operation erases collation.
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+            Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
+    }
+
+    /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
+    protected abstract int aggregateFieldsCount();
+
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        final List<RelNode> inputs = getInputs();
+
+        double rows = mq.getRowCount(inputs.get(0));
+
+        for (int i = 1; i < inputs.size(); i++)
+            rows -= 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
+
+        return rows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        double rows = estimateRowCount(mq);
+
+        double inputRows = 0;
+
+        for (RelNode input : getInputs())
+            inputRows += mq.getRowCount(input);
+
+        double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+
+        return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
new file mode 100644
index 0000000..c068e8e
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for REDUCE phase of MINUS (EXCEPT) operator.
+ */
+public class IgniteReduceMinus extends IgniteMinusBase {
+    /** */
+    public IgniteReduceMinus(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        boolean all,
+        RelDataType rowType
+    ) {
+        super(cluster, traitSet, ImmutableList.of(input), all);
+
+        this.rowType = rowType;
+    }
+
+    /** */
+    public IgniteReduceMinus(RelInput input) {
+        this(
+            input.getCluster(),
+            input.getTraitSet().replace(IgniteConvention.INSTANCE),
+            input.getInput(),
+            input.getBoolean("all", false),
+            input.getRowType("rowType")
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        super.explainTerms(pw)
+            .itemIf("rowType", rowType, pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES);
+
+        return pw;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(
+            Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+        if (IgniteDistributions.single().satisfies(distr)) {
+            return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+                Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+            ImmutableList.of(sole(inputTraits).replace(IgniteDistributions.single()))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+            inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteReduceMinus(getCluster(), traitSet, sole(inputs), all, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteReduceMinus clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteReduceMinus(cluster, getTraitSet(), sole(inputs), all, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int aggregateFieldsCount() {
+        return rowType.getFieldCount() + COUNTER_FIELDS_CNT;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
new file mode 100644
index 0000000..3165bb5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
+ */
+public class IgniteSingleMinus extends IgniteMinusBase {
+    /** {@inheritDoc} */
+    public IgniteSingleMinus(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        List<RelNode> inputs,
+        boolean all
+    ) {
+        super(cluster, traitSet, inputs, all);
+    }
+
+    /** */
+    public IgniteSingleMinus(RelInput input) {
+        super(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
+        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+        if (IgniteDistributions.single().satisfies(distr)) {
+            return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+                Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean single = inputTraits.stream()
+            .map(TraitUtils::distribution)
+            .allMatch(d -> d.satisfies(IgniteDistributions.single()));
+
+        if (!single)
+            return ImmutableList.of();
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        Set<CorrelationId> correlationIds = inTraits.stream()
+            .map(TraitUtils::correlation)
+            .flatMap(corrTr -> corrTr.correlationIds().stream())
+            .collect(Collectors.toSet());
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
+            inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteSingleMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int aggregateFieldsCount() {
+        return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
new file mode 100644
index 0000000..7e11c0c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+
+/**
+ * MINUS (EXCEPT) operation converter rule.
+ */
+public class MinusConverterRule {
+    /** */
+    public static final RelOptRule SINGLE = new SingleMinusConverterRule();
+
+    /** */
+    public static final RelOptRule MAP_REDUCE = new MapReduceMinusConverterRule();
+
+    /** */
+    private MinusConverterRule() {
+        // No-op.
+    }
+
+    /** */
+    private static class SingleMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
+        /** */
+        SingleMinusConverterRule() {
+            super(LogicalMinus.class, "SingleMinusConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
+            RelOptCluster cluster = setOp.getCluster();
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+            List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+            return new IgniteSingleMinus(cluster, outTrait, inputs, setOp.all);
+        }
+    }
+
+    /** */
+    private static class MapReduceMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
+        /** */
+        MapReduceMinusConverterRule() {
+            super(LogicalMinus.class, "MapReduceMinusConverterRule");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
+            RelOptCluster cluster = setOp.getCluster();
+            RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+            RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+            List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+            RelNode map = new IgniteMapMinus(cluster, outTrait, inputs, setOp.all);
+
+            return new IgniteReduceMinus(
+                cluster,
+                outTrait.replace(IgniteDistributions.single()),
+                convert(map, inTrait.replace(IgniteDistributions.single())),
+                setOp.all,
+                cluster.getTypeFactory().leastRestrictive(Util.transform(inputs, RelNode::getRowType))
+            );
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 42e53dc..cd14468 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -26,11 +26,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import javax.cache.Cache;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
@@ -452,13 +454,13 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         +----+-------+-------+
         | ID | NAME  | SALARY|
         +----+-------+-------+
-        |  1 | igor  |   10  |
-        |  2 | igor  |   11  |
-        |  3 | igor  |   12  |
-        |  4 | igor1 |   13  |
-        |  5 | igor1 |   13  |
-        |  6 | igor1 |   13  |
-        |  7 | roman |   14  |
+        |  1 | Igor  |   10  |
+        |  2 | Igor  |   11  |
+        |  3 | Igor  |   12  |
+        |  4 | Igor1 |   13  |
+        |  5 | Igor1 |   13  |
+        |  6 | Igor1 |   13  |
+        |  7 | Roman |   14  |
         +----+-------+-------+
 
         select * from account;
@@ -469,14 +471,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         |  2 | Roman |   11  |
         |  3 | Roman |   12  |
         |  4 | Roman |   13  |
-        |  5 | igor1 |   13  |
-        |  6 | igor1 |   13  |
+        |  5 | Igor1 |   13  |
+        |  6 | Igor1 |   13  |
         +----+-------+-------+
          */
 
         awaitPartitionMapExchange(true, true, null);
     }
 
+    /** Copy cache with it's content to new replicated cache. */
+    private void copyCacheAsReplicated(String cacheName) throws InterruptedException {
+        IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<Object, Object>(
+            cache.getConfiguration(CacheConfiguration.class));
+
+        ccfg.setName(cacheName + "Replicated");
+        ccfg.setCacheMode(CacheMode.REPLICATED);
+        ccfg.getQueryEntities().forEach(qe -> qe.setTableName(qe.getTableName() + "_repl"));
+
+        IgniteCache<Object, Object> replCache = client.getOrCreateCache(ccfg);
+
+        for (Cache.Entry<?, ?> entry : cache)
+            replCache.put(entry.getKey(), entry.getValue());
+
+        awaitPartitionMapExchange(true, true, null);
+    }
+
     /** */
     @Test
     public void testOrderingByColumnOutsideSelectList() throws InterruptedException {
@@ -632,6 +653,207 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         assertEquals(1, rows.size());
     }
 
+    /** */
+    @Test
+    public void testExcept() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT SELECT name from Account");
+
+        assertEquals(1, rows.size());
+        assertEquals("Igor", rows.get(0).get(0));
+    }
+
+    /** */
+    @Test
+    public void testExceptFromEmpty() throws Exception {
+        populateTables();
+
+        copyCacheAsReplicated("orders");
+        copyCacheAsReplicated("account");
+
+        List<List<?>> rows = sql("SELECT name FROM Orders WHERE salary < 0 EXCEPT SELECT name FROM Account");
+
+        assertEquals(0, rows.size());
+
+        rows = sql("SELECT name FROM Orders_repl WHERE salary < 0 EXCEPT SELECT name FROM Account_repl");
+
+        assertEquals(0, rows.size());
+    }
+
+    /** */
+    @Test
+    public void testExceptSeveralColumns() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name, salary FROM Orders EXCEPT SELECT name, salary from Account");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+    }
+
+    /** */
+    @Test
+    public void testExceptAll() throws Exception {
+        populateTables();
+
+        List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT ALL SELECT name from Account");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptNested() throws Exception {
+        populateTables();
+
+        List<List<?>> rows =
+            sql("SELECT name FROM Orders EXCEPT (SELECT name FROM Orders EXCEPT SELECT name from Account)");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptReplicatedWithPartitioned() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("orders");
+
+        List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptReplicated() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("orders");
+        copyCacheAsReplicated("account");
+
+        List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account_repl");
+
+        assertEquals(4, rows.size());
+        assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptMerge() throws Exception {
+        populateTables();
+        copyCacheAsReplicated("orders");
+
+        List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name FROM Account EXCEPT ALL " +
+            "SELECT name FROM orders WHERE salary < 11");
+
+        assertEquals(3, rows.size());
+        assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor")));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+    }
+
+    /** */
+    @Test
+    public void testExceptBigBatch() throws Exception {
+        client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+            .setName("cache1")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table1")))
+            .setBackups(2)
+        );
+
+        client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+            .setName("cache2")
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table2")))
+            .setBackups(1)
+        );
+
+        copyCacheAsReplicated("cache1");
+        copyCacheAsReplicated("cache2");
+
+        try (IgniteDataStreamer<Integer, Integer> ds1 = client.dataStreamer("cache1");
+             IgniteDataStreamer<Integer, Integer> ds2 = client.dataStreamer("cache2");
+             IgniteDataStreamer<Integer, Integer> ds3 = client.dataStreamer("cache1Replicated");
+             IgniteDataStreamer<Integer, Integer> ds4 = client.dataStreamer("cache2Replicated")
+        ) {
+            int key = 0;
+
+            for (int i = 0; i < 5; i++) {
+                for (int j = 0; j < ((i == 0) ? 1 : (1 << (i * 4 - 1))); j++) {
+                    // Cache1 keys count: 1 of "0", 8 of "1", 128 of "2", 2048 of "3", 32768 of "4".
+                    ds1.addData(key++, i);
+                    ds3.addData(key++, i);
+
+                    // Cache2 keys count: 1 of "5", 128 of "3", 32768 of "1".
+                    if ((i & 1) == 0) {
+                        ds2.addData(key++, 5 - i);
+                        ds4.addData(key++, 5 - i);
+                    }
+                }
+            }
+        }
+
+        awaitPartitionMapExchange(true, true, null);
+
+        List<List<?>> rows;
+
+        // Check 2 partitioned caches.
+        rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(3, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(34817, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        // Check 1 replicated and 1 partitioned caches.
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(3, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+        assertEquals(34817, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        // Check 2 replicated caches.
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(3, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(34817, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+        assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+        assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+    }
+
     /**
      * Execute SQL statement on given node.
      *
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
new file mode 100644
index 0000000..772c246
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ * Test execution of MINUS (EXCEPT) operator.
+ */
+public class MinusExecutionTest extends AbstractExecutionTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Before
+    @Override public void setup() throws Exception {
+        nodesCnt = 1;
+        super.setup();
+    }
+
+    /** */
+    @Test
+    public void testSingle() {
+        checkMinus(true, false);
+    }
+
+    /** */
+    @Test
+    public void testSingleAll() {
+        checkMinus(true, true);
+    }
+
+    /** */
+    @Test
+    public void testMapReduce() {
+        checkMinus(false, false);
+    }
+
+    /** */
+    @Test
+    public void testMapReduceAll() {
+        checkMinus(false, true);
+    }
+
+    /** */
+    @Test
+    public void testSingleWithEmptySet() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+
+        List<Object[]> data = Arrays.asList(
+            row("Igor", 1),
+            row("Roman", 1)
+        );
+
+        // For "single minus" operation, node should not request rows from the next source if result after the previous
+        // source is already empty.
+        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, data);
+        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, data);
+        Node<Object[]> node3 = new AbstractNode<Object[]>(ctx, rowType) {
+            @Override protected void rewindInternal() {
+                // No-op.
+            }
+
+            @Override protected Downstream<Object[]> requestDownstream(int idx) {
+                return null;
+            }
+
+            @Override public void request(int rowsCnt) throws Exception {
+                fail("Node should not be requested");
+            }
+        };
+
+        MinusNode<Object[]> minusNode = new MinusNode<>(ctx, rowType, SINGLE, false, rowFactory());
+        minusNode.register(Arrays.asList(scan1, scan2, node3));
+
+        RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+        root.register(minusNode);
+
+        assertFalse(root.hasNext());
+    }
+
+    /**
+     * @param single Single.
+     * @param all All.
+     */
+    private void checkMinus(boolean single, boolean all) {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+
+        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 1),
+            row("Roman", 1),
+            row("Igor", 1),
+            row("Roman", 2),
+            row("Igor", 1),
+            row("Igor", 1),
+            row("Igor", 2)
+        ));
+
+        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
+            row("Igor", 1),
+            row("Roman", 1),
+            row("Igor", 1),
+            row("Alexey", 1)
+        ));
+
+        MinusNode<Object[]> minusNode;
+
+        if (single) {
+            minusNode = new MinusNode<>(
+                ctx,
+                rowType,
+                SINGLE,
+                all,
+                rowFactory()
+            );
+        }
+        else {
+            minusNode =  new MinusNode<>(
+                ctx,
+                rowType,
+                MAP,
+                all,
+                rowFactory()
+            );
+        }
+
+        minusNode.register(Arrays.asList(scan1, scan2));
+
+        if (!single) {
+            MinusNode<Object[]> reduceNode = new MinusNode<>(
+                ctx,
+                rowType,
+                REDUCE,
+                all,
+                rowFactory()
+            );
+
+            reduceNode.register(Collections.singletonList(minusNode));
+
+            minusNode = reduceNode;
+        }
+
+        Comparator<Object[]> cmp = ctx.expressionFactory().comparator(RelCollations.of(ImmutableIntList.of(0, 1)));
+
+        // Create sort node on the top to check sorted results.
+        SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, cmp);
+        sortNode.register(minusNode);
+
+        RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+        root.register(sortNode);
+
+        assertTrue(root.hasNext());
+
+        if (all) {
+            Assert.assertArrayEquals(row("Igor", 1), root.next());
+            Assert.assertArrayEquals(row("Igor", 1), root.next());
+        }
+
+        Assert.assertArrayEquals(row("Igor", 2), root.next());
+        Assert.assertArrayEquals(row("Roman", 2), root.next());
+
+        assertFalse(root.hasNext());
+    }
+
+    /** */
+    protected RowHandler.RowFactory<Object[]> rowFactory() {
+        return new RowHandler.RowFactory<Object[]>() {
+            /** */
+            @Override public RowHandler<Object[]> handler() {
+                return ArrayRowHandler.INSTANCE;
+            }
+
+            /** */
+            @Override public Object[] create() {
+                throw new AssertionError();
+            }
+
+            /** */
+            @Override public Object[] create(Object... fields) {
+                return fields;
+            }
+        };
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
new file mode 100644
index 0000000..c9cb0cc
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify EXCEPT operator.
+ */
+public class ExceptPlannerTest extends AbstractPlannerTest {
+    /** Public schema. */
+    private IgniteSchema publicSchema;
+
+    /** Last error. */
+    private String lastError;
+
+    /** {@inheritDoc} */
+    @Before
+    @Override public void setup() {
+        super.setup();
+
+        publicSchema = new IgniteSchema("PUBLIC");
+
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        RelDataType type = new RelDataTypeFactory.Builder(f)
+            .add("ID", f.createJavaType(Integer.class))
+            .add("NAME", f.createJavaType(String.class))
+            .add("SALARY", f.createJavaType(Double.class))
+            .build();
+
+        createTable("RANDOM_TBL1", type, IgniteDistributions.random(), null);
+        createTable("RANDOM_TBL2", type, IgniteDistributions.random(), null);
+        createTable("BROADCAST_TBL1", type, IgniteDistributions.broadcast(), null);
+        createTable("BROADCAST_TBL2", type, IgniteDistributions.broadcast(), null);
+        createTable("SINGLE_TBL1", type, IgniteDistributions.single(), null);
+        createTable("SINGLE_TBL2", type, IgniteDistributions.single(), null);
+
+        List<List<UUID>> assignment = Arrays.asList(
+            select(nodes, 0, 1),
+            select(nodes, 1, 2),
+            select(nodes, 2, 0),
+            select(nodes, 0, 1),
+            select(nodes, 1, 2)
+        );
+
+        createTable("AFFINITY_TBL1", type, IgniteDistributions.affinity(0, "Test1", "hash"),
+            assignment);
+
+        createTable("AFFINITY_TBL2", type, IgniteDistributions.affinity(0, "Test2", "hash"),
+            assignment);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptRandom() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM random_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
+            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                .and(input(0, isTableScan("random_tbl1")))
+                .and(input(1, isTableScan("random_tbl2")))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptAllRandom() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl1 " +
+            "EXCEPT ALL " +
+            "SELECT * FROM random_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
+            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                .and(input(0, isTableScan("random_tbl1")))
+                .and(input(1, isTableScan("random_tbl2")))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptBroadcast() throws Exception {
+        String sql = "" +
+            "SELECT * FROM broadcast_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM broadcast_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, isTableScan("broadcast_tbl1")))
+            .and(input(1, isTableScan("broadcast_tbl2")))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptSingle() throws Exception {
+        String sql = "" +
+            "SELECT * FROM single_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM single_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, isTableScan("single_tbl1")))
+            .and(input(1, isTableScan("single_tbl2"))));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptSingleAndRandom() throws Exception {
+        String sql = "" +
+            "SELECT * FROM single_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM random_tbl1 ";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, isTableScan("single_tbl1")))
+            .and(input(1, hasChildThat(isTableScan("random_tbl1")))));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptSingleAndAffinity() throws Exception {
+        String sql = "" +
+            "SELECT * FROM single_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM affinity_tbl1 ";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, isTableScan("single_tbl1")))
+            .and(input(1, hasChildThat(isTableScan("affinity_tbl1")))));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptSingleAndBroadcast() throws Exception {
+        String sql = "" +
+            "SELECT * FROM single_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM broadcast_tbl1 ";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, isTableScan("single_tbl1")))
+            .and(input(1, isTableScan("broadcast_tbl1")))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptAffinity() throws Exception {
+        String sql = "" +
+            "SELECT * FROM affinity_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM affinity_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteReduceMinus.class)
+            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                .and(input(0, isTableScan("affinity_tbl1")))
+                .and(input(1, isTableScan("affinity_tbl2")))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptBroadcastAndRandom() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM broadcast_tbl1 ";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, hasChildThat(isTableScan("random_tbl1"))))
+            .and(input(1, isTableScan("broadcast_tbl1")))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptRandomNested() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl2 EXCEPT (" +
+            "   SELECT * FROM random_tbl1 " +
+            "   EXCEPT " +
+            "   SELECT * FROM random_tbl2" +
+            ")";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
+            .and(input(1, isInstanceOf(IgniteReduceMinus.class)
+                .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                    .and(input(0, isTableScan("random_tbl1")))
+                    .and(input(1, isTableScan("random_tbl2")))
+                ))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptBroadcastAndRandomNested() throws Exception {
+        String sql = "" +
+            "SELECT * FROM broadcast_tbl1 EXCEPT (" +
+            "   SELECT * FROM random_tbl1 " +
+            "   EXCEPT " +
+            "   SELECT * FROM random_tbl2" +
+            ")";
+
+        assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+            .and(input(0, isTableScan("broadcast_tbl1")))
+            .and(input(1, isInstanceOf(IgniteReduceMinus.class)
+                .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                    .and(input(0, isTableScan("random_tbl1")))
+                    .and(input(1, isTableScan("random_tbl2")))
+                ))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptMerge() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM random_tbl2 " +
+            "EXCEPT " +
+            "SELECT * FROM affinity_tbl1 " +
+            "EXCEPT " +
+            "SELECT * FROM affinity_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteReduceMinus.class)
+            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                .and(input(0, isTableScan("random_tbl1")))
+                .and(input(1, isTableScan("random_tbl2")))
+                .and(input(2, isTableScan("affinity_tbl1")))
+                .and(input(3, isTableScan("affinity_tbl2")))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptAllMerge() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl1 " +
+            "EXCEPT ALL " +
+            "SELECT * FROM random_tbl2 " +
+            "EXCEPT ALL " +
+            "SELECT * FROM affinity_tbl1 " +
+            "EXCEPT ALL " +
+            "SELECT * FROM affinity_tbl2 ";
+
+        assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
+            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                .and(input(0, isTableScan("random_tbl1")))
+                .and(input(1, isTableScan("random_tbl2")))
+                .and(input(2, isTableScan("affinity_tbl1")))
+                .and(input(3, isTableScan("affinity_tbl2")))
+            ))
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExceptAllWithExceptMerge() throws Exception {
+        String sql = "" +
+            "SELECT * FROM random_tbl1 " +
+            "EXCEPT ALL " +
+            "SELECT * FROM random_tbl2 " +
+            "EXCEPT " +
+            "SELECT * FROM affinity_tbl1 ";
+
+        assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
+            .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+                .and(input(0, isTableScan("random_tbl1")))
+                .and(input(1, isTableScan("random_tbl2")))
+                .and(input(2, isTableScan("affinity_tbl1")))
+            ))
+        );
+    }
+
+    /** */
+    private <T extends RelNode> void assertPlan(String sql, Predicate<T> predicate) throws Exception {
+        IgniteRel plan = physicalPlan(sql, publicSchema);
+
+        if (!predicate.test((T)plan)) {
+            String invalidPlanMsg = "Invalid plan (" + lastError + "):\n" +
+                RelOptUtil.toString(plan, SqlExplainLevel.ALL_ATTRIBUTES);
+
+            fail(invalidPlanMsg);
+        }
+    }
+
+    /** */
+    private <T extends RelNode> Predicate<T> isInstanceOf(Class<T> cls) {
+        return node -> {
+            if (cls.isInstance(node))
+                return true;
+
+            lastError = "Unexpected node class [node=" + node + ", cls=" + cls.getSimpleName() + ']';
+
+            return false;
+        };
+    }
+
+    /** */
+    private <T extends RelNode> Predicate<IgniteTableScan> isTableScan(String tableName) {
+        return isInstanceOf(IgniteTableScan.class).and(
+            n -> {
+                String scanTableName = n.getTable().unwrap(TestTable.class).name();
+
+                if (tableName.equalsIgnoreCase(scanTableName))
+                    return true;
+
+                lastError = "Unexpected table name [exp=" + tableName + ", act=" + scanTableName + ']';
+
+                return false;
+            });
+    }
+
+    /** */
+    private <T extends RelNode> Predicate<RelNode> hasChildThat(Predicate<T> predicate) {
+        return new Predicate<RelNode>() {
+            public boolean checkRecursively(RelNode node) {
+                if (predicate.test((T)node))
+                    return true;
+
+                for (RelNode input : node.getInputs()) {
+                    if (checkRecursively(input))
+                        return true;
+                }
+
+                return false;
+            }
+
+            @Override public boolean test(RelNode node) {
+                for (RelNode input : node.getInputs()) {
+                    if (checkRecursively(input))
+                        return true;
+                }
+
+                lastError = "Not found child for defined condition [node=" + node + ']';
+
+                return false;
+            }
+        };
+    }
+
+    /** */
+    private <T extends RelNode> Predicate<RelNode> input(Predicate<T> predicate) {
+        return node -> {
+            if (F.isEmpty(node.getInputs())) {
+                lastError = "No inputs for node [node=" + node + ']';
+
+                return false;
+            }
+
+            return predicate.test((T)node.getInput(0));
+        };
+    }
+
+    /** */
+    private <T extends RelNode> Predicate<RelNode> input(int idx, Predicate<T> predicate) {
+        return node -> {
+            if (F.size(node.getInputs()) <= idx) {
+                lastError = "No input for node [idx=" + idx + ", node=" + node + ']';
+
+                return false;
+            }
+
+            return predicate.test((T)node.getInput(idx));
+        };
+    }
+
+    /** */
+    private void createTable(String name, RelDataType type, IgniteDistribution distr, List<List<UUID>> assignment) {
+        TestTable table = new TestTable(type) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                if (F.isEmpty(assignment))
+                    return super.colocationGroup(ctx);
+                else
+                    return ColocationGroup.forAssignments(assignment);
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return distr;
+            }
+
+            @Override public String name() {
+                return name;
+            }
+        };
+
+        publicSchema.addTable(name, table);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 2ab4fcb..185617b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregat
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashIndexSpoolExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
@@ -45,6 +46,7 @@ import org.junit.runners.Suite;
     HashAggregateExecutionTest.class,
     HashAggregateSingleGroupExecutionTest.class,
     SortAggregateExecutionTest.class,
+    MinusExecutionTest.class
 })
 public class ExecutionTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index a6d7767..c466f72 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
@@ -45,6 +46,7 @@ import org.junit.runners.Suite;
     HashAggregatePlannerTest.class,
     SortAggregatePlannerTest.class,
     JoinColocationPlannerTest.class,
+    ExceptPlannerTest.class
 })
 public class PlannerTestSuite {
 }