You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/04/22 14:05:22 UTC
[ignite] branch sql-calcite updated: IGNITE-13691 Support of EXCEPT
operator - Fixes #9009.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new a651823 IGNITE-13691 Support of EXCEPT operator - Fixes #9009.
a651823 is described below
commit a65182349eb8f2820225bf356020d487bd4b8eaf
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Apr 22 16:39:04 2021 +0300
IGNITE-13691 Support of EXCEPT operator - Fixes #9009.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../query/calcite/exec/ExecutionServiceImpl.java | 1 +
.../query/calcite/exec/LogicalRelImplementor.java | 35 ++
.../query/calcite/exec/exp/agg/GroupKey.java | 5 +
.../query/calcite/exec/rel/MinusNode.java | 387 +++++++++++++++++
.../query/calcite/exec/rel/RootNode.java | 4 +-
.../query/calcite/externalize/RelJson.java | 1 +
.../query/calcite/metadata/IgniteMdRowCount.java | 8 +
.../processors/query/calcite/prepare/Cloner.java | 20 +
.../query/calcite/prepare/IgniteRelShuttle.java | 18 +
.../query/calcite/prepare/PlannerPhase.java | 4 +
.../query/calcite/rel/IgniteRelVisitor.java | 18 +
.../query/calcite/rel/set/IgniteMapMinus.java | 141 +++++++
.../query/calcite/rel/set/IgniteMinusBase.java | 100 +++++
.../query/calcite/rel/set/IgniteReduceMinus.java | 135 ++++++
.../query/calcite/rel/set/IgniteSingleMinus.java | 134 ++++++
.../query/calcite/rule/MinusConverterRule.java | 94 +++++
.../query/calcite/CalciteQueryProcessorTest.java | 240 ++++++++++-
.../query/calcite/exec/rel/MinusExecutionTest.java | 222 ++++++++++
.../query/calcite/planner/ExceptPlannerTest.java | 468 +++++++++++++++++++++
.../ignite/testsuites/ExecutionTestSuite.java | 2 +
.../apache/ignite/testsuites/PlannerTestSuite.java | 2 +
21 files changed, 2028 insertions(+), 11 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index b3b0b6c..ab3bb4e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -563,6 +563,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
case WITH:
case VALUES:
case UNION:
+ case EXCEPT:
return prepareQuery(sqlNode, ctx);
case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index f9cfcf5..873a2f0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ModifyNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
@@ -83,6 +84,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
@@ -424,6 +429,36 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteSingleMinus rel) {
+ return visit(rel, AggregateType.SINGLE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteMapMinus rel) {
+ return visit(rel, AggregateType.MAP);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteReduceMinus rel) {
+ return visit(rel, AggregateType.REDUCE);
+ }
+
+ /** Visit IgniteMinus rel. */
+ private Node<Row> visit(IgniteMinusBase rel, AggregateType aggType) {
+ RelDataType rowType = rel.getRowType();
+
+ RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+ List<Node<Row>> inputs = Commons.transform(rel.getInputs(), this::visit);
+
+ MinusNode<Row> node = new MinusNode<>(ctx, rowType, aggType, rel.all, rowFactory);
+
+ node.register(inputs);
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
@Override public Node<Row> visit(IgniteTableModify rel) {
switch (rel.getOperation()) {
case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
index 2796a14..efdf857 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
@@ -40,6 +40,11 @@ public class GroupKey {
return fields[idx];
}
+ /** */
+ public int fieldsCount() {
+ return fields.length;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
new file mode 100644
index 0000000..4859119
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Execution node for MINUS (EXCEPT) operator.
+ */
+public class MinusNode<Row> extends AbstractNode<Row> {
+ /** */
+ private final AggregateType type;
+
+ /** */
+ private final boolean all;
+
+ /** */
+ private final RowFactory<Row> rowFactory;
+
+ /** */
+ private final Grouping grouping;
+
+ /** */
+ private int requested;
+
+ /** */
+ private int waiting;
+
+ /** Current source index. */
+ private int curSrcIdx;
+
+ /** */
+ private boolean inLoop;
+
+ /**
+ * @param ctx Execution context.
+ */
+ public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+ RowFactory<Row> rowFactory) {
+ super(ctx, rowType);
+
+ this.all = all;
+ this.type = type;
+ this.rowFactory = rowFactory;
+
+ grouping = new Grouping();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(int rowsCnt) throws Exception {
+ assert !F.isEmpty(sources());
+ assert rowsCnt > 0 && requested == 0;
+ assert waiting <= 0;
+
+ checkState();
+
+ requested = rowsCnt;
+
+ if (waiting == 0)
+ sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+ else if (!inLoop)
+ context().execute(this::flush, this::onError);
+ }
+
+ /** */
+ public void push(Row row, int idx) throws Exception {
+ assert downstream() != null;
+ assert waiting > 0;
+
+ checkState();
+
+ waiting--;
+
+ grouping.add(row, idx);
+
+ if (waiting == 0)
+ sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+ }
+
+ /** */
+ public void end(int idx) throws Exception {
+ assert downstream() != null;
+ assert waiting > 0;
+ assert curSrcIdx == idx;
+
+ checkState();
+
+ if (type == AggregateType.SINGLE && grouping.isEmpty())
+ curSrcIdx = sources().size(); // Skip subsequent sources.
+ else
+ curSrcIdx++;
+
+ if (curSrcIdx >= sources().size()) {
+ waiting = -1;
+
+ flush();
+ }
+ else
+ sources().get(curSrcIdx).request(waiting);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void rewindInternal() {
+ requested = 0;
+ waiting = 0;
+ grouping.groups.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Downstream<Row> requestDownstream(int idx) {
+ return new Downstream<Row>() {
+ @Override public void push(Row row) throws Exception {
+ MinusNode.this.push(row, idx);
+ }
+
+ @Override public void end() throws Exception {
+ MinusNode.this.end(idx);
+ }
+
+ @Override public void onError(Throwable e) {
+ MinusNode.this.onError(e);
+ }
+ };
+ }
+
+ /** */
+ private void flush() throws Exception {
+ if (isClosed())
+ return;
+
+ checkState();
+
+ assert waiting == -1;
+
+ int processed = 0;
+
+ inLoop = true;
+
+ try {
+ while (requested > 0 && !grouping.isEmpty()) {
+ int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
+
+ for (Row row : grouping.getRows(toSnd)) {
+ checkState();
+
+ requested--;
+ downstream().push(row);
+
+ processed++;
+ }
+
+ if (processed >= IN_BUFFER_SIZE && requested > 0) {
+ // Allow others to do their job.
+ context().execute(this::flush, this::onError);
+
+ return;
+ }
+ }
+ }
+ finally {
+ inLoop = false;
+ }
+
+ if (requested > 0) {
+ requested = 0;
+
+ downstream().end();
+ }
+ }
+
+ /** */
+ private class Grouping {
+ /**
+ * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
+ * keys in all sets except first.
+ */
+ private final Map<GroupKey, int[]> groups = new HashMap<>();
+
+ /** */
+ private final RowHandler<Row> hnd;
+
+ /** */
+ private Grouping() {
+ hnd = context().rowHandler();
+ }
+
+ /** */
+ private void add(Row row, int setIdx) {
+ if (type == AggregateType.REDUCE) {
+ assert setIdx == 0 : "Unexpected set index: " + setIdx;
+
+ addOnReducer(row);
+ }
+ else if (type == AggregateType.MAP)
+ addOnMapper(row, setIdx);
+ else
+ addOnSingle(row, setIdx);
+ }
+
+ /**
+ * @param cnt Number of rows.
+ *
+ * @return Actually sent rows number.
+ */
+ private List<Row> getRows(int cnt) {
+ if (F.isEmpty(groups))
+ return Collections.emptyList();
+ else if (type == AggregateType.MAP)
+ return getOnMapper(cnt);
+ else
+ return getOnSingleOrReducer(cnt);
+ }
+
+ /** */
+ private GroupKey key(Row row) {
+ int size = hnd.columnCount(row);
+
+ Object[] fields = new Object[size];
+
+ for (int i = 0; i < size; i++)
+ fields[i] = hnd.get(i, row);
+
+ return new GroupKey(fields);
+ }
+
+ /** */
+ private void addOnSingle(Row row, int setIdx) {
+ int[] cntrs;
+
+ GroupKey key = key(row);
+
+ if (setIdx == 0) {
+ cntrs = groups.computeIfAbsent(key, k -> new int[2]);
+
+ cntrs[0]++;
+ }
+ else {
+ cntrs = groups.get(key);
+
+ if (cntrs != null) {
+ cntrs[1]++;
+
+ if (cntrs[1] >= cntrs[0])
+ groups.remove(key);
+ }
+ }
+ }
+
+ /** */
+ private void addOnMapper(Row row, int setIdx) {
+ int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[2]);
+
+ cntrs[setIdx == 0 ? 0 : 1]++;
+ }
+
+ /** */
+ private void addOnReducer(Row row) {
+ GroupKey grpKey = (GroupKey)hnd.get(0, row);
+
+ int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[2]);
+
+ int[] cntrsMap = (int[])hnd.get(1, row);
+
+ for (int i = 0; i < cntrsMap.length; i++)
+ cntrs[i] += cntrsMap[i];
+ }
+
+ /** */
+ private List<Row> getOnMapper(int cnt) {
+ Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
+
+ int amount = Math.min(cnt, groups.size());
+ List<Row> res = new ArrayList<>(amount);
+
+ while (amount > 0 && it.hasNext()) {
+ Map.Entry<GroupKey, int[]> entry = it.next();
+
+ // Skip row if it doesn't affect the final result.
+ if (entry.getValue()[0] != entry.getValue()[1]) {
+ res.add(rowFactory.create(entry.getKey(), entry.getValue()));
+
+ amount--;
+ }
+
+ it.remove();
+ }
+
+ return res;
+ }
+
+ /** */
+ private List<Row> getOnSingleOrReducer(int cnt) {
+ Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
+
+ List<Row> res = new ArrayList<>(cnt);
+
+ while (it.hasNext() && cnt > 0) {
+ Map.Entry<GroupKey, int[]> entry = it.next();
+
+ GroupKey key = entry.getKey();
+
+ Object[] fields = new Object[key.fieldsCount()];
+
+ for (int i = 0; i < fields.length; i++)
+ fields[i] = key.field(i);
+
+ Row row = rowFactory.create(fields);
+
+ int[] cntrs = entry.getValue();
+
+ int availableRows = availableRows(entry.getValue());
+
+ if (availableRows <= cnt){
+ it.remove();
+
+ if (availableRows == 0)
+ continue;
+
+ cnt -= availableRows;
+ }
+ else {
+ availableRows = cnt;
+
+ decrementAvailableRows(cntrs, availableRows);
+
+ cnt = 0;
+ }
+
+ for (int i = 0; i < availableRows; i++)
+ res.add(row);
+ }
+
+ return res;
+ }
+
+ /** */
+ private int availableRows(int[] cntrs) {
+ assert cntrs.length == 2;
+
+ if (all)
+ return Math.max(cntrs[0] - cntrs[1], 0);
+ else
+ return cntrs[1] == 0 ? 1 : 0;
+ }
+
+ /** */
+ private void decrementAvailableRows(int[] cntrs, int amount) {
+ assert amount > 0;
+ assert all;
+
+ cntrs[0] -= amount;
+ }
+
+
+ /** */
+ private boolean isEmpty() {
+ return groups.isEmpty();
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 9bdda10..a4ce9ee 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -140,10 +140,10 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** {@inheritDoc} */
@Override public void push(Row row) throws Exception {
- assert waiting > 0;
-
lock.lock();
try {
+ assert waiting > 0;
+
checkState();
waiting--;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
index d454b9a..cf024a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -218,6 +218,7 @@ class RelJson {
ImmutableList.of(
"org.apache.ignite.internal.processors.query.calcite.rel.",
"org.apache.ignite.internal.processors.query.calcite.rel.agg.",
+ "org.apache.ignite.internal.processors.query.calcite.rel.set.",
"org.apache.calcite.rel.",
"org.apache.calcite.rel.core.",
"org.apache.calcite.rel.logical.",
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 8be9f1a..4b01441 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -31,6 +31,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
@@ -114,4 +115,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
public double getRowCount(IgniteSortedIndexSpool rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}
+
+ /**
+ * Estimation of row count for MINUS (EXCEPT) operator.
+ */
+ public double getRowCount(IgniteMinusBase rel, RelMetadataQuery mq) {
+ return rel.estimateRowCount(mq);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 5ef854a..2a8b410 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -46,6 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
@@ -215,15 +218,32 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteHashIndexSpool rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSingleMinus rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapMinus rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReduceMinus rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteRel rel) {
return rel.accept(this);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index a069f19..6397b30 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -46,6 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/** */
@@ -180,6 +183,21 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
return rel.accept(this);
}
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSingleMinus rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapMinus rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReduceMinus rel) {
+ return processNode(rel);
+ }
+
/**
* Visits all children of a parent.
*/
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 3dced8d..ccfce87 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMerge
import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.MinusConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
@@ -137,6 +138,7 @@ public enum PlannerPhase {
.anyInputs()).toRule(),
CoreRules.UNION_MERGE,
+ CoreRules.MINUS_MERGE,
CoreRules.UNION_REMOVE,
CoreRules.JOIN_COMMUTE,
CoreRules.AGGREGATE_REMOVE,
@@ -168,6 +170,8 @@ public enum PlannerPhase {
HashAggregateConverterRule.MAP_REDUCE,
SortAggregateConverterRule.SINGLE,
SortAggregateConverterRule.MAP_REDUCE,
+ MinusConverterRule.SINGLE,
+ MinusConverterRule.MAP_REDUCE,
MergeJoinConverterRule.INSTANCE,
NestedLoopJoinConverterRule.INSTANCE,
ProjectConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 072b587e..cab5c5f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -23,6 +23,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
/**
* A visitor to traverse an Ignite relational nodes tree.
@@ -154,6 +157,21 @@ public interface IgniteRelVisitor<T> {
T visit(IgniteHashIndexSpool rel);
/**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteSingleMinus rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteMapMinus rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
+ T visit(IgniteReduceMinus rel);
+
+ /**
* Visits a relational node and calculates a result on the basis of node meta information.
* @param rel Relational node.
* @return Visit result.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
new file mode 100644
index 0000000..061cc1c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of MINUS (EXCEPT) operator.
+ */
+public class IgniteMapMinus extends IgniteMinusBase {
+ /** */
+ public IgniteMapMinus(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelNode> inputs,
+ boolean all
+ ) {
+ super(cluster, traitSet, inputs, all);
+ }
+
+ /** */
+ public IgniteMapMinus(RelInput input) {
+ super(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected RelDataType deriveRowType() {
+ RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
+
+ assert typeFactory instanceof IgniteTypeFactory;
+
+ RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+ builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
+ builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
+
+ return builder.build();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ boolean rewindable = inputTraits.stream()
+ .map(TraitUtils::rewindability)
+ .allMatch(RewindabilityTrait::rewindable);
+
+ if (rewindable)
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+ Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+ return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+ if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+ return ImmutableList.of(); // Mixing of single and random is prohibited.
+
+ return ImmutableList.of(
+ Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
+ t -> t.replace(IgniteDistributions.random())))
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ Set<CorrelationId> correlationIds = inTraits.stream()
+ .map(TraitUtils::correlation)
+ .flatMap(corrTr -> corrTr.correlationIds().stream())
+ .collect(Collectors.toSet());
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int aggregateFieldsCount() {
+ return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
new file mode 100644
index 0000000..b2cf77b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Base class for physical MINUS (EXCEPT) set op.
+ */
+public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
+ /** Count of counter fields used to aggregate results. */
+ protected static final int COUNTER_FIELDS_CNT = 2;
+
+ /** */
+ IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ protected IgniteMinusBase(RelInput input) {
+ super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ // Operation erases collation.
+ return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ // Operation erases collation.
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
+ Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
+ }
+
+ /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
+ protected abstract int aggregateFieldsCount();
+
+ /** {@inheritDoc} */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ final List<RelNode> inputs = getInputs();
+
+ double rows = mq.getRowCount(inputs.get(0));
+
+ for (int i = 1; i < inputs.size(); i++)
+ rows -= 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
+
+ return rows;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+ double rows = estimateRowCount(mq);
+
+ double inputRows = 0;
+
+ for (RelNode input : getInputs())
+ inputRows += mq.getRowCount(input);
+
+ double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+
+ return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
new file mode 100644
index 0000000..c068e8e
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for REDUCE phase of MINUS (EXCEPT) operator.
+ */
+public class IgniteReduceMinus extends IgniteMinusBase {
+ /** */
+ public IgniteReduceMinus(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ boolean all,
+ RelDataType rowType
+ ) {
+ super(cluster, traitSet, ImmutableList.of(input), all);
+
+ this.rowType = rowType;
+ }
+
+ /** */
+ public IgniteReduceMinus(RelInput input) {
+ this(
+ input.getCluster(),
+ input.getTraitSet().replace(IgniteConvention.INSTANCE),
+ input.getInput(),
+ input.getBoolean("all", false),
+ input.getRowType("rowType")
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ super.explainTerms(pw)
+ .itemIf("rowType", rowType, pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES);
+
+ return pw;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(
+ Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+ if (IgniteDistributions.single().satisfies(distr)) {
+ return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+ Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+ ImmutableList.of(sole(inputTraits).replace(IgniteDistributions.single()))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteReduceMinus(getCluster(), traitSet, sole(inputs), all, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteReduceMinus clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteReduceMinus(cluster, getTraitSet(), sole(inputs), all, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int aggregateFieldsCount() {
+ return rowType.getFieldCount() + COUNTER_FIELDS_CNT;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
new file mode 100644
index 0000000..3165bb5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
+ */
+public class IgniteSingleMinus extends IgniteMinusBase {
+ /** {@inheritDoc} */
+ public IgniteSingleMinus(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelNode> inputs,
+ boolean all
+ ) {
+ super(cluster, traitSet, inputs, all);
+ }
+
+ /** */
+ public IgniteSingleMinus(RelInput input) {
+ super(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ boolean rewindable = inputTraits.stream()
+ .map(TraitUtils::rewindability)
+ .allMatch(RewindabilityTrait::rewindable);
+
+ if (rewindable)
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+ Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
+ IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+ if (IgniteDistributions.single().satisfies(distr)) {
+ return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+ Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ boolean single = inputTraits.stream()
+ .map(TraitUtils::distribution)
+ .allMatch(d -> d.satisfies(IgniteDistributions.single()));
+
+ if (!single)
+ return ImmutableList.of();
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ Set<CorrelationId> correlationIds = inTraits.stream()
+ .map(TraitUtils::correlation)
+ .flatMap(corrTr -> corrTr.correlationIds().stream())
+ .collect(Collectors.toSet());
+
+ return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteSingleMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int aggregateFieldsCount() {
+ return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
new file mode 100644
index 0000000..7e11c0c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+
+/**
+ * MINUS (EXCEPT) operation converter rule.
+ */
+public class MinusConverterRule {
+ /** */
+ public static final RelOptRule SINGLE = new SingleMinusConverterRule();
+
+ /** */
+ public static final RelOptRule MAP_REDUCE = new MapReduceMinusConverterRule();
+
+ /** */
+ private MinusConverterRule() {
+ // No-op.
+ }
+
+ /** */
+ private static class SingleMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
+ /** */
+ SingleMinusConverterRule() {
+ super(LogicalMinus.class, "SingleMinusConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
+ RelOptCluster cluster = setOp.getCluster();
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+ List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+ return new IgniteSingleMinus(cluster, outTrait, inputs, setOp.all);
+ }
+ }
+
+ /** */
+ private static class MapReduceMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
+ /** */
+ MapReduceMinusConverterRule() {
+ super(LogicalMinus.class, "MapReduceMinusConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
+ RelOptCluster cluster = setOp.getCluster();
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+ RelNode map = new IgniteMapMinus(cluster, outTrait, inputs, setOp.all);
+
+ return new IgniteReduceMinus(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(map, inTrait.replace(IgniteDistributions.single())),
+ setOp.all,
+ cluster.getTypeFactory().leastRestrictive(Util.transform(inputs, RelNode::getRowType))
+ );
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 42e53dc..cd14468 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -26,11 +26,13 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import javax.cache.Cache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
@@ -452,13 +454,13 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
+----+-------+-------+
| ID | NAME | SALARY|
+----+-------+-------+
- | 1 | igor | 10 |
- | 2 | igor | 11 |
- | 3 | igor | 12 |
- | 4 | igor1 | 13 |
- | 5 | igor1 | 13 |
- | 6 | igor1 | 13 |
- | 7 | roman | 14 |
+ | 1 | Igor | 10 |
+ | 2 | Igor | 11 |
+ | 3 | Igor | 12 |
+ | 4 | Igor1 | 13 |
+ | 5 | Igor1 | 13 |
+ | 6 | Igor1 | 13 |
+ | 7 | Roman | 14 |
+----+-------+-------+
select * from account;
@@ -469,14 +471,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
| 2 | Roman | 11 |
| 3 | Roman | 12 |
| 4 | Roman | 13 |
- | 5 | igor1 | 13 |
- | 6 | igor1 | 13 |
+ | 5 | Igor1 | 13 |
+ | 6 | Igor1 | 13 |
+----+-------+-------+
*/
awaitPartitionMapExchange(true, true, null);
}
+ /** Copy cache with it's content to new replicated cache. */
+ private void copyCacheAsReplicated(String cacheName) throws InterruptedException {
+ IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<Object, Object>(
+ cache.getConfiguration(CacheConfiguration.class));
+
+ ccfg.setName(cacheName + "Replicated");
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+ ccfg.getQueryEntities().forEach(qe -> qe.setTableName(qe.getTableName() + "_repl"));
+
+ IgniteCache<Object, Object> replCache = client.getOrCreateCache(ccfg);
+
+ for (Cache.Entry<?, ?> entry : cache)
+ replCache.put(entry.getKey(), entry.getValue());
+
+ awaitPartitionMapExchange(true, true, null);
+ }
+
/** */
@Test
public void testOrderingByColumnOutsideSelectList() throws InterruptedException {
@@ -632,6 +653,207 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEquals(1, rows.size());
}
+ /** */
+ @Test
+ public void testExcept() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT SELECT name from Account");
+
+ assertEquals(1, rows.size());
+ assertEquals("Igor", rows.get(0).get(0));
+ }
+
+ /** */
+ @Test
+ public void testExceptFromEmpty() throws Exception {
+ populateTables();
+
+ copyCacheAsReplicated("orders");
+ copyCacheAsReplicated("account");
+
+ List<List<?>> rows = sql("SELECT name FROM Orders WHERE salary < 0 EXCEPT SELECT name FROM Account");
+
+ assertEquals(0, rows.size());
+
+ rows = sql("SELECT name FROM Orders_repl WHERE salary < 0 EXCEPT SELECT name FROM Account_repl");
+
+ assertEquals(0, rows.size());
+ }
+
+ /** */
+ @Test
+ public void testExceptSeveralColumns() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name, salary FROM Orders EXCEPT SELECT name, salary from Account");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ }
+
+ /** */
+ @Test
+ public void testExceptAll() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT ALL SELECT name from Account");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptNested() throws Exception {
+ populateTables();
+
+ List<List<?>> rows =
+ sql("SELECT name FROM Orders EXCEPT (SELECT name FROM Orders EXCEPT SELECT name from Account)");
+
+ assertEquals(2, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptReplicatedWithPartitioned() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("orders");
+
+ List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptReplicated() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("orders");
+ copyCacheAsReplicated("account");
+
+ List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account_repl");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptMerge() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("orders");
+
+ List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name FROM Account EXCEPT ALL " +
+ "SELECT name FROM orders WHERE salary < 11");
+
+ assertEquals(3, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptBigBatch() throws Exception {
+ client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+ .setName("cache1")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table1")))
+ .setBackups(2)
+ );
+
+ client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+ .setName("cache2")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table2")))
+ .setBackups(1)
+ );
+
+ copyCacheAsReplicated("cache1");
+ copyCacheAsReplicated("cache2");
+
+ try (IgniteDataStreamer<Integer, Integer> ds1 = client.dataStreamer("cache1");
+ IgniteDataStreamer<Integer, Integer> ds2 = client.dataStreamer("cache2");
+ IgniteDataStreamer<Integer, Integer> ds3 = client.dataStreamer("cache1Replicated");
+ IgniteDataStreamer<Integer, Integer> ds4 = client.dataStreamer("cache2Replicated")
+ ) {
+ int key = 0;
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < ((i == 0) ? 1 : (1 << (i * 4 - 1))); j++) {
+ // Cache1 keys count: 1 of "0", 8 of "1", 128 of "2", 2048 of "3", 32768 of "4".
+ ds1.addData(key++, i);
+ ds3.addData(key++, i);
+
+ // Cache2 keys count: 1 of "5", 128 of "3", 32768 of "1".
+ if ((i & 1) == 0) {
+ ds2.addData(key++, 5 - i);
+ ds4.addData(key++, 5 - i);
+ }
+ }
+ }
+ }
+
+ awaitPartitionMapExchange(true, true, null);
+
+ List<List<?>> rows;
+
+ // Check 2 partitioned caches.
+ rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(3, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(34817, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+ assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+ // Check 1 replicated and 1 partitioned caches.
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(3, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(34817, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+ assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+ // Check 2 replicated caches.
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2Replicated\"" +
+ ".table2_repl");
+
+ assertEquals(3, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2Replicated\"" +
+ ".table2_repl");
+
+ assertEquals(34817, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+ assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+ }
+
/**
* Execute SQL statement on given node.
*
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
new file mode 100644
index 0000000..772c246
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
+import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+
+/**
+ * Test execution of MINUS (EXCEPT) operator.
+ */
+public class MinusExecutionTest extends AbstractExecutionTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Before
+ @Override public void setup() throws Exception {
+ nodesCnt = 1;
+ super.setup();
+ }
+
+ /** */
+ @Test
+ public void testSingle() {
+ checkMinus(true, false);
+ }
+
+ /** */
+ @Test
+ public void testSingleAll() {
+ checkMinus(true, true);
+ }
+
+ /** */
+ @Test
+ public void testMapReduce() {
+ checkMinus(false, false);
+ }
+
+ /** */
+ @Test
+ public void testMapReduceAll() {
+ checkMinus(false, true);
+ }
+
+ /** */
+ @Test
+ public void testSingleWithEmptySet() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+
+ List<Object[]> data = Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1)
+ );
+
+ // For "single minus" operation, node should not request rows from the next source if result after the previous
+ // source is already empty.
+ ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, data);
+ ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, data);
+ Node<Object[]> node3 = new AbstractNode<Object[]>(ctx, rowType) {
+ @Override protected void rewindInternal() {
+ // No-op.
+ }
+
+ @Override protected Downstream<Object[]> requestDownstream(int idx) {
+ return null;
+ }
+
+ @Override public void request(int rowsCnt) throws Exception {
+ fail("Node should not be requested");
+ }
+ };
+
+ MinusNode<Object[]> minusNode = new MinusNode<>(ctx, rowType, SINGLE, false, rowFactory());
+ minusNode.register(Arrays.asList(scan1, scan2, node3));
+
+ RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+ root.register(minusNode);
+
+ assertFalse(root.hasNext());
+ }
+
+ /**
+ * @param single Single.
+ * @param all All.
+ */
+ private void checkMinus(boolean single, boolean all) {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
+
+ ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1),
+ row("Igor", 1),
+ row("Roman", 2),
+ row("Igor", 1),
+ row("Igor", 1),
+ row("Igor", 2)
+ ));
+
+ ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1),
+ row("Igor", 1),
+ row("Alexey", 1)
+ ));
+
+ MinusNode<Object[]> minusNode;
+
+ if (single) {
+ minusNode = new MinusNode<>(
+ ctx,
+ rowType,
+ SINGLE,
+ all,
+ rowFactory()
+ );
+ }
+ else {
+ minusNode = new MinusNode<>(
+ ctx,
+ rowType,
+ MAP,
+ all,
+ rowFactory()
+ );
+ }
+
+ minusNode.register(Arrays.asList(scan1, scan2));
+
+ if (!single) {
+ MinusNode<Object[]> reduceNode = new MinusNode<>(
+ ctx,
+ rowType,
+ REDUCE,
+ all,
+ rowFactory()
+ );
+
+ reduceNode.register(Collections.singletonList(minusNode));
+
+ minusNode = reduceNode;
+ }
+
+ Comparator<Object[]> cmp = ctx.expressionFactory().comparator(RelCollations.of(ImmutableIntList.of(0, 1)));
+
+ // Create sort node on the top to check sorted results.
+ SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, cmp);
+ sortNode.register(minusNode);
+
+ RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+ root.register(sortNode);
+
+ assertTrue(root.hasNext());
+
+ if (all) {
+ Assert.assertArrayEquals(row("Igor", 1), root.next());
+ Assert.assertArrayEquals(row("Igor", 1), root.next());
+ }
+
+ Assert.assertArrayEquals(row("Igor", 2), root.next());
+ Assert.assertArrayEquals(row("Roman", 2), root.next());
+
+ assertFalse(root.hasNext());
+ }
+
+ /** */
+ protected RowHandler.RowFactory<Object[]> rowFactory() {
+ return new RowHandler.RowFactory<Object[]>() {
+ /** */
+ @Override public RowHandler<Object[]> handler() {
+ return ArrayRowHandler.INSTANCE;
+ }
+
+ /** */
+ @Override public Object[] create() {
+ throw new AssertionError();
+ }
+
+ /** */
+ @Override public Object[] create(Object... fields) {
+ return fields;
+ }
+ };
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
new file mode 100644
index 0000000..c9cb0cc
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify EXCEPT operator.
+ */
+public class ExceptPlannerTest extends AbstractPlannerTest {
+ /** Public schema. */
+ private IgniteSchema publicSchema;
+
+ /** Last error. */
+ private String lastError;
+
+ /** {@inheritDoc} */
+ @Before
+ @Override public void setup() {
+ super.setup();
+
+ publicSchema = new IgniteSchema("PUBLIC");
+
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ RelDataType type = new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("SALARY", f.createJavaType(Double.class))
+ .build();
+
+ createTable("RANDOM_TBL1", type, IgniteDistributions.random(), null);
+ createTable("RANDOM_TBL2", type, IgniteDistributions.random(), null);
+ createTable("BROADCAST_TBL1", type, IgniteDistributions.broadcast(), null);
+ createTable("BROADCAST_TBL2", type, IgniteDistributions.broadcast(), null);
+ createTable("SINGLE_TBL1", type, IgniteDistributions.single(), null);
+ createTable("SINGLE_TBL2", type, IgniteDistributions.single(), null);
+
+ List<List<UUID>> assignment = Arrays.asList(
+ select(nodes, 0, 1),
+ select(nodes, 1, 2),
+ select(nodes, 2, 0),
+ select(nodes, 0, 1),
+ select(nodes, 1, 2)
+ );
+
+ createTable("AFFINITY_TBL1", type, IgniteDistributions.affinity(0, "Test1", "hash"),
+ assignment);
+
+ createTable("AFFINITY_TBL2", type, IgniteDistributions.affinity(0, "Test2", "hash"),
+ assignment);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptRandom() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM random_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptAllRandom() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl1 " +
+ "EXCEPT ALL " +
+ "SELECT * FROM random_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptBroadcast() throws Exception {
+ String sql = "" +
+ "SELECT * FROM broadcast_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM broadcast_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, isTableScan("broadcast_tbl1")))
+ .and(input(1, isTableScan("broadcast_tbl2")))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptSingle() throws Exception {
+ String sql = "" +
+ "SELECT * FROM single_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM single_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, isTableScan("single_tbl1")))
+ .and(input(1, isTableScan("single_tbl2"))));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptSingleAndRandom() throws Exception {
+ String sql = "" +
+ "SELECT * FROM single_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM random_tbl1 ";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, isTableScan("single_tbl1")))
+ .and(input(1, hasChildThat(isTableScan("random_tbl1")))));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptSingleAndAffinity() throws Exception {
+ String sql = "" +
+ "SELECT * FROM single_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM affinity_tbl1 ";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, isTableScan("single_tbl1")))
+ .and(input(1, hasChildThat(isTableScan("affinity_tbl1")))));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptSingleAndBroadcast() throws Exception {
+ String sql = "" +
+ "SELECT * FROM single_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM broadcast_tbl1 ";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, isTableScan("single_tbl1")))
+ .and(input(1, isTableScan("broadcast_tbl1")))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptAffinity() throws Exception {
+ String sql = "" +
+ "SELECT * FROM affinity_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM affinity_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteReduceMinus.class)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("affinity_tbl1")))
+ .and(input(1, isTableScan("affinity_tbl2")))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptBroadcastAndRandom() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM broadcast_tbl1 ";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, hasChildThat(isTableScan("random_tbl1"))))
+ .and(input(1, isTableScan("broadcast_tbl1")))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptRandomNested() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl2 EXCEPT (" +
+ " SELECT * FROM random_tbl1 " +
+ " EXCEPT " +
+ " SELECT * FROM random_tbl2" +
+ ")";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
+ .and(input(1, isInstanceOf(IgniteReduceMinus.class)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptBroadcastAndRandomNested() throws Exception {
+ String sql = "" +
+ "SELECT * FROM broadcast_tbl1 EXCEPT (" +
+ " SELECT * FROM random_tbl1 " +
+ " EXCEPT " +
+ " SELECT * FROM random_tbl2" +
+ ")";
+
+ assertPlan(sql, isInstanceOf(IgniteSingleMinus.class)
+ .and(input(0, isTableScan("broadcast_tbl1")))
+ .and(input(1, isInstanceOf(IgniteReduceMinus.class)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptMerge() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM random_tbl2 " +
+ "EXCEPT " +
+ "SELECT * FROM affinity_tbl1 " +
+ "EXCEPT " +
+ "SELECT * FROM affinity_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteReduceMinus.class)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ .and(input(2, isTableScan("affinity_tbl1")))
+ .and(input(3, isTableScan("affinity_tbl2")))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptAllMerge() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl1 " +
+ "EXCEPT ALL " +
+ "SELECT * FROM random_tbl2 " +
+ "EXCEPT ALL " +
+ "SELECT * FROM affinity_tbl1 " +
+ "EXCEPT ALL " +
+ "SELECT * FROM affinity_tbl2 ";
+
+ assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ .and(input(2, isTableScan("affinity_tbl1")))
+ .and(input(3, isTableScan("affinity_tbl2")))
+ ))
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExceptAllWithExceptMerge() throws Exception {
+ String sql = "" +
+ "SELECT * FROM random_tbl1 " +
+ "EXCEPT ALL " +
+ "SELECT * FROM random_tbl2 " +
+ "EXCEPT " +
+ "SELECT * FROM affinity_tbl1 ";
+
+ assertPlan(sql, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
+ .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ .and(input(2, isTableScan("affinity_tbl1")))
+ ))
+ );
+ }
+
+ /** */
+ private <T extends RelNode> void assertPlan(String sql, Predicate<T> predicate) throws Exception {
+ IgniteRel plan = physicalPlan(sql, publicSchema);
+
+ if (!predicate.test((T)plan)) {
+ String invalidPlanMsg = "Invalid plan (" + lastError + "):\n" +
+ RelOptUtil.toString(plan, SqlExplainLevel.ALL_ATTRIBUTES);
+
+ fail(invalidPlanMsg);
+ }
+ }
+
+ /** */
+ private <T extends RelNode> Predicate<T> isInstanceOf(Class<T> cls) {
+ return node -> {
+ if (cls.isInstance(node))
+ return true;
+
+ lastError = "Unexpected node class [node=" + node + ", cls=" + cls.getSimpleName() + ']';
+
+ return false;
+ };
+ }
+
+ /** */
+ private <T extends RelNode> Predicate<IgniteTableScan> isTableScan(String tableName) {
+ return isInstanceOf(IgniteTableScan.class).and(
+ n -> {
+ String scanTableName = n.getTable().unwrap(TestTable.class).name();
+
+ if (tableName.equalsIgnoreCase(scanTableName))
+ return true;
+
+ lastError = "Unexpected table name [exp=" + tableName + ", act=" + scanTableName + ']';
+
+ return false;
+ });
+ }
+
+ /** */
+ private <T extends RelNode> Predicate<RelNode> hasChildThat(Predicate<T> predicate) {
+ return new Predicate<RelNode>() {
+ public boolean checkRecursively(RelNode node) {
+ if (predicate.test((T)node))
+ return true;
+
+ for (RelNode input : node.getInputs()) {
+ if (checkRecursively(input))
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override public boolean test(RelNode node) {
+ for (RelNode input : node.getInputs()) {
+ if (checkRecursively(input))
+ return true;
+ }
+
+ lastError = "Not found child for defined condition [node=" + node + ']';
+
+ return false;
+ }
+ };
+ }
+
+ /** */
+ private <T extends RelNode> Predicate<RelNode> input(Predicate<T> predicate) {
+ return node -> {
+ if (F.isEmpty(node.getInputs())) {
+ lastError = "No inputs for node [node=" + node + ']';
+
+ return false;
+ }
+
+ return predicate.test((T)node.getInput(0));
+ };
+ }
+
+ /** */
+ private <T extends RelNode> Predicate<RelNode> input(int idx, Predicate<T> predicate) {
+ return node -> {
+ if (F.size(node.getInputs()) <= idx) {
+ lastError = "No input for node [idx=" + idx + ", node=" + node + ']';
+
+ return false;
+ }
+
+ return predicate.test((T)node.getInput(idx));
+ };
+ }
+
+ /** */
+ private void createTable(String name, RelDataType type, IgniteDistribution distr, List<List<UUID>> assignment) {
+ TestTable table = new TestTable(type) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ if (F.isEmpty(assignment))
+ return super.colocationGroup(ctx);
+ else
+ return ColocationGroup.forAssignments(assignment);
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return distr;
+ }
+
+ @Override public String name() {
+ return name;
+ }
+ };
+
+ publicSchema.addTable(name, table);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 2ab4fcb..185617b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregat
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashIndexSpoolExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
@@ -45,6 +46,7 @@ import org.junit.runners.Suite;
HashAggregateExecutionTest.class,
HashAggregateSingleGroupExecutionTest.class,
SortAggregateExecutionTest.class,
+ MinusExecutionTest.class
})
public class ExecutionTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index a6d7767..c466f72 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
@@ -45,6 +46,7 @@ import org.junit.runners.Suite;
HashAggregatePlannerTest.class,
SortAggregatePlannerTest.class,
JoinColocationPlannerTest.class,
+ ExceptPlannerTest.class
})
public class PlannerTestSuite {
}