You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/06/04 14:06:31 UTC
[ignite] branch sql-calcite updated: IGNITE-14638 Support for
INTERSECT operator - Fixes #9095.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new de76397 IGNITE-14638 Support for INTERSECT operator - Fixes #9095.
de76397 is described below
commit de76397d33fda782512e0b4c034728877c455b8d
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Jun 4 17:04:55 2021 +0300
IGNITE-14638 Support for INTERSECT operator - Fixes #9095.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../query/calcite/exec/ExecutionServiceImpl.java | 1 +
.../query/calcite/exec/LogicalRelImplementor.java | 35 +-
.../rel/{MinusNode.java => AbstractSetOpNode.java} | 135 +++----
.../query/calcite/exec/rel/IntersectNode.java | 118 ++++++
.../query/calcite/exec/rel/MinusNode.java | 330 ++--------------
.../query/calcite/metadata/IgniteMdRowCount.java | 6 +-
.../processors/query/calcite/prepare/Cloner.java | 16 +-
.../query/calcite/prepare/IgniteRelShuttle.java | 23 +-
.../query/calcite/prepare/PlannerPhase.java | 9 +-
.../query/calcite/rel/IgniteRelVisitor.java | 16 +-
.../query/calcite/rel/IgniteTableFunctionScan.java | 12 +
.../query/calcite/rel/set/IgniteIntersect.java | 67 ++++
.../IgniteMapIntersect.java} | 58 ++-
.../query/calcite/rel/set/IgniteMapMinus.java | 74 +---
.../{IgniteMapMinus.java => IgniteMapSetOp.java} | 86 ++---
.../set/{IgniteMinusBase.java => IgniteMinus.java} | 46 +--
...ReduceMinus.java => IgniteReduceIntersect.java} | 64 +---
.../query/calcite/rel/set/IgniteReduceMinus.java | 50 +--
.../query/calcite/rel/set/IgniteReduceSetOp.java | 74 ++++
.../set/{IgniteMinusBase.java => IgniteSetOp.java} | 55 +--
.../calcite/rel/set/IgniteSingleIntersect.java | 67 ++++
.../query/calcite/rel/set/IgniteSingleMinus.java | 71 +---
...niteSingleMinus.java => IgniteSingleSetOp.java} | 62 +--
.../query/calcite/rule/MinusConverterRule.java | 94 -----
.../query/calcite/rule/SetOpConverterRule.java | 185 +++++++++
.../query/calcite/CalciteQueryProcessorTest.java | 223 -----------
...onTest.java => AbstractSetOpExecutionTest.java} | 109 ++----
.../calcite/exec/rel/IntersectExecutionTest.java | 82 ++++
.../query/calcite/exec/rel/MinusExecutionTest.java | 204 ++--------
.../integration/AbstractBasicIntegrationTest.java | 2 +-
.../calcite/integration/SetOpIntegrationTest.java | 425 +++++++++++++++++++++
.../query/calcite/planner/AbstractPlannerTest.java | 2 +
...xceptPlannerTest.java => SetOpPlannerTest.java} | 265 ++++++++-----
...tionTest.java => TableFunctionPlannerTest.java} | 2 +-
.../ignite/testsuites/ExecutionTestSuite.java | 2 +
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
.../apache/ignite/testsuites/PlannerTestSuite.java | 8 +-
37 files changed, 1511 insertions(+), 1569 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 330e8c2..c161e89 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -552,6 +552,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
case VALUES:
case UNION:
case EXCEPT:
+ case INTERSECT:
return prepareQuery(sqlNode, ctx);
case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index dc559f5..70480b9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -26,7 +26,9 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Spool;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
@@ -37,11 +39,13 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFa
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractSetOpNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusNode;
@@ -85,10 +89,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
@@ -432,29 +433,21 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteSingleMinus rel) {
- return visit(rel, AggregateType.SINGLE);
- }
-
- /** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteMapMinus rel) {
- return visit(rel, AggregateType.MAP);
- }
-
- /** {@inheritDoc} */
- @Override public Node<Row> visit(IgniteReduceMinus rel) {
- return visit(rel, AggregateType.REDUCE);
- }
-
- /** Visit IgniteMinus rel. */
- private Node<Row> visit(IgniteMinusBase rel, AggregateType aggType) {
+ @Override public Node<Row> visit(IgniteSetOp rel) {
RelDataType rowType = rel.getRowType();
RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
List<Node<Row>> inputs = Commons.transform(rel.getInputs(), this::visit);
- MinusNode<Row> node = new MinusNode<>(ctx, rowType, aggType, rel.all, rowFactory);
+ AbstractSetOpNode<Row> node;
+
+ if (rel instanceof Minus)
+ node = new MinusNode<>(ctx, rowType, rel.aggregateType(), rel.all(), rowFactory);
+ else if (rel instanceof Intersect)
+ node = new IntersectNode<>(ctx, rowType, rel.aggregateType(), rel.all(), rowFactory, rel.getInputs().size());
+ else
+ throw new AssertionError();
node.register(inputs);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
similarity index 76%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
index 72c8025..6e60a88 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
@@ -32,20 +32,14 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey
import org.apache.ignite.internal.util.typedef.F;
/**
- * Execution node for MINUS (EXCEPT) operator.
+ * Abstract execution node for set operators (EXCEPT, INTERSECT).
*/
-public class MinusNode<Row> extends AbstractNode<Row> {
+public abstract class AbstractSetOpNode<Row> extends AbstractNode<Row> {
/** */
private final AggregateType type;
/** */
- private final boolean all;
-
- /** */
- private final RowFactory<Row> rowFactory;
-
- /** */
- private final Grouping grouping;
+ private final Grouping<Row> grouping;
/** */
private int requested;
@@ -59,18 +53,13 @@ public class MinusNode<Row> extends AbstractNode<Row> {
/** */
private boolean inLoop;
- /**
- * @param ctx Execution context.
- */
- public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
- RowFactory<Row> rowFactory) {
+ /** */
+ protected AbstractSetOpNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+ RowFactory<Row> rowFactory, Grouping<Row> grouping) {
super(ctx, rowType);
- this.all = all;
this.type = type;
- this.rowFactory = rowFactory;
-
- grouping = new Grouping();
+ this.grouping = grouping;
}
/** {@inheritDoc} */
@@ -112,6 +101,8 @@ public class MinusNode<Row> extends AbstractNode<Row> {
checkState();
+ grouping.endOfSet(idx);
+
if (type == AggregateType.SINGLE && grouping.isEmpty())
curSrcIdx = sources().size(); // Skip subsequent sources.
else
@@ -137,15 +128,15 @@ public class MinusNode<Row> extends AbstractNode<Row> {
@Override protected Downstream<Row> requestDownstream(int idx) {
return new Downstream<Row>() {
@Override public void push(Row row) throws Exception {
- MinusNode.this.push(row, idx);
+ AbstractSetOpNode.this.push(row, idx);
}
@Override public void end() throws Exception {
- MinusNode.this.end(idx);
+ AbstractSetOpNode.this.end(idx);
}
@Override public void onError(Throwable e) {
- MinusNode.this.onError(e);
+ AbstractSetOpNode.this.onError(e);
}
};
}
@@ -164,13 +155,12 @@ public class MinusNode<Row> extends AbstractNode<Row> {
inLoop = true;
try {
- while (requested > 0 && !grouping.isEmpty()) {
+ if (requested > 0 && !grouping.isEmpty()) {
int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
for (Row row : grouping.getRows(toSnd)) {
- checkState();
-
requested--;
+
downstream().push(row);
processed++;
@@ -196,19 +186,31 @@ public class MinusNode<Row> extends AbstractNode<Row> {
}
/** */
- private class Grouping {
- /**
- * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
- * keys in all sets except first.
- */
- private final Map<GroupKey, int[]> groups = new HashMap<>();
+ protected abstract static class Grouping<Row> {
+ /** */
+ protected final Map<GroupKey, int[]> groups = new HashMap<>();
+
+ /** */
+ protected final RowHandler<Row> hnd;
+
+ /** */
+ protected final AggregateType type;
/** */
- private final RowHandler<Row> hnd;
+ protected final boolean all;
/** */
- private Grouping() {
- hnd = context().rowHandler();
+ protected final RowFactory<Row> rowFactory;
+
+ /** Processed rows count in current set. */
+ protected int rowsCnt = 0;
+
+ /** */
+ protected Grouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type, boolean all) {
+ hnd = ctx.rowHandler();
+ this.type = type;
+ this.all = all;
+ this.rowFactory = rowFactory;
}
/** */
@@ -222,6 +224,8 @@ public class MinusNode<Row> extends AbstractNode<Row> {
addOnMapper(row, setIdx);
else
addOnSingle(row, setIdx);
+
+ rowsCnt++;
}
/**
@@ -239,7 +243,7 @@ public class MinusNode<Row> extends AbstractNode<Row> {
}
/** */
- private GroupKey key(Row row) {
+ protected GroupKey key(Row row) {
int size = hnd.columnCount(row);
Object[] fields = new Object[size];
@@ -251,49 +255,31 @@ public class MinusNode<Row> extends AbstractNode<Row> {
}
/** */
- private void addOnSingle(Row row, int setIdx) {
- int[] cntrs;
-
- GroupKey key = key(row);
-
- if (setIdx == 0) {
- cntrs = groups.computeIfAbsent(key, k -> new int[2]);
-
- cntrs[0]++;
- }
- else {
- cntrs = groups.get(key);
-
- if (cntrs != null) {
- cntrs[1]++;
-
- if (cntrs[1] >= cntrs[0])
- groups.remove(key);
- }
- }
+ protected void endOfSet(int setIdx) {
+ rowsCnt = 0;
}
/** */
- private void addOnMapper(Row row, int setIdx) {
- int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[2]);
+ protected abstract void addOnSingle(Row row, int setIdx);
- cntrs[setIdx == 0 ? 0 : 1]++;
- }
+ /** */
+ protected abstract void addOnMapper(Row row, int setIdx);
/** */
- private void addOnReducer(Row row) {
+ protected void addOnReducer(Row row) {
GroupKey grpKey = (GroupKey)hnd.get(0, row);
+ int[] cntrsMap = (int[])hnd.get(1, row);
- int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[2]);
+ int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[cntrsMap.length]);
- int[] cntrsMap = (int[])hnd.get(1, row);
+ assert cntrs.length == cntrsMap.length;
for (int i = 0; i < cntrsMap.length; i++)
cntrs[i] += cntrsMap[i];
}
/** */
- private List<Row> getOnMapper(int cnt) {
+ protected List<Row> getOnMapper(int cnt) {
Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
int amount = Math.min(cnt, groups.size());
@@ -303,7 +289,7 @@ public class MinusNode<Row> extends AbstractNode<Row> {
Map.Entry<GroupKey, int[]> entry = it.next();
// Skip row if it doesn't affect the final result.
- if (entry.getValue()[0] != entry.getValue()[1]) {
+ if (affectResult(entry.getValue())) {
res.add(rowFactory.create(entry.getKey(), entry.getValue()));
amount--;
@@ -316,7 +302,7 @@ public class MinusNode<Row> extends AbstractNode<Row> {
}
/** */
- private List<Row> getOnSingleOrReducer(int cnt) {
+ protected List<Row> getOnSingleOrReducer(int cnt) {
Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
List<Row> res = new ArrayList<>(cnt);
@@ -360,23 +346,16 @@ public class MinusNode<Row> extends AbstractNode<Row> {
return res;
}
- /** */
- private int availableRows(int[] cntrs) {
- assert cntrs.length == 2;
-
- if (all)
- return Math.max(cntrs[0] - cntrs[1], 0);
- else
- return cntrs[1] == 0 ? 1 : 0;
- }
+ /**
+ * Return {@code true} if counters affects the final result, or {@code false} if row can be skipped.
+ */
+ protected abstract boolean affectResult(int[] cntrs);
/** */
- private void decrementAvailableRows(int[] cntrs, int amount) {
- assert amount > 0;
- assert all;
+ protected abstract int availableRows(int[] cntrs);
- cntrs[0] -= amount;
- }
+ /** */
+ protected abstract void decrementAvailableRows(int[] cntrs, int amount);
/** */
private boolean isEmpty() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
new file mode 100644
index 0000000..15d4bbf
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+
+/**
+ * Execution node for INTERSECT operator.
+ */
+public class IntersectNode<Row> extends AbstractSetOpNode<Row> {
+ /** */
+ public IntersectNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+ RowFactory<Row> rowFactory, int inputsCnt) {
+ super(ctx, rowType, type, all, rowFactory, new IntersectGrouping<>(ctx, rowFactory, type, all, inputsCnt));
+ }
+
+ /** */
+ private static class IntersectGrouping<Row> extends Grouping<Row> {
+ /** Inputs count. */
+ private final int inputsCnt;
+
+ /** */
+ private IntersectGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type,
+ boolean all, int inputsCnt) {
+ super(ctx, rowFactory, type, all);
+
+ this.inputsCnt = inputsCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void endOfSet(int setIdx) {
+ if (type == AggregateType.SINGLE && rowsCnt == 0)
+ groups.clear();
+
+ super.endOfSet(setIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void addOnSingle(Row row, int setIdx) {
+ int[] cntrs;
+
+ GroupKey key = key(row);
+
+ if (setIdx == 0) {
+ cntrs = groups.computeIfAbsent(key, k -> new int[inputsCnt]);
+
+ cntrs[0]++;
+ }
+ else {
+ cntrs = groups.get(key);
+
+ if (cntrs != null) {
+ if (cntrs[setIdx - 1] == 0)
+ groups.remove(key);
+ else
+ cntrs[setIdx]++;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void addOnMapper(Row row, int setIdx) {
+ int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[inputsCnt]);
+
+ cntrs[setIdx]++;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean affectResult(int[] cntrs) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int availableRows(int[] cntrs) {
+ int cnt = cntrs[0];
+
+ for (int i = 1; i < cntrs.length; i++) {
+ if (cntrs[i] < cnt)
+ cnt = cntrs[i];
+ }
+
+ if (all) {
+ cntrs[0] = cnt; // Whith this we can decrement only the first element to get the same result.
+
+ return cnt;
+ }
+ else
+ return cnt == 0 ? 0 : 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void decrementAvailableRows(int[] cntrs, int amount) {
+ assert amount > 0;
+ assert all;
+
+ cntrs[0] -= amount;
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
index 72c8025..056fbbb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
@@ -17,251 +17,43 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.util.typedef.F;
/**
* Execution node for MINUS (EXCEPT) operator.
*/
-public class MinusNode<Row> extends AbstractNode<Row> {
+public class MinusNode<Row> extends AbstractSetOpNode<Row> {
/** */
- private final AggregateType type;
-
- /** */
- private final boolean all;
-
- /** */
- private final RowFactory<Row> rowFactory;
-
- /** */
- private final Grouping grouping;
-
- /** */
- private int requested;
-
- /** */
- private int waiting;
-
- /** Current source index. */
- private int curSrcIdx;
-
- /** */
- private boolean inLoop;
-
- /**
- * @param ctx Execution context.
- */
public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
RowFactory<Row> rowFactory) {
- super(ctx, rowType);
-
- this.all = all;
- this.type = type;
- this.rowFactory = rowFactory;
-
- grouping = new Grouping();
- }
-
- /** {@inheritDoc} */
- @Override public void request(int rowsCnt) throws Exception {
- assert !F.isEmpty(sources());
- assert rowsCnt > 0 && requested == 0;
- assert waiting <= 0;
-
- checkState();
-
- requested = rowsCnt;
-
- if (waiting == 0)
- sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
- else if (!inLoop)
- context().execute(this::flush, this::onError);
- }
-
- /** */
- public void push(Row row, int idx) throws Exception {
- assert downstream() != null;
- assert waiting > 0;
-
- checkState();
-
- waiting--;
-
- grouping.add(row, idx);
-
- if (waiting == 0)
- sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
- }
-
- /** */
- public void end(int idx) throws Exception {
- assert downstream() != null;
- assert waiting > 0;
- assert curSrcIdx == idx;
-
- checkState();
-
- if (type == AggregateType.SINGLE && grouping.isEmpty())
- curSrcIdx = sources().size(); // Skip subsequent sources.
- else
- curSrcIdx++;
-
- if (curSrcIdx >= sources().size()) {
- waiting = -1;
-
- flush();
- }
- else
- sources().get(curSrcIdx).request(waiting);
- }
-
- /** {@inheritDoc} */
- @Override protected void rewindInternal() {
- requested = 0;
- waiting = 0;
- grouping.groups.clear();
- }
-
- /** {@inheritDoc} */
- @Override protected Downstream<Row> requestDownstream(int idx) {
- return new Downstream<Row>() {
- @Override public void push(Row row) throws Exception {
- MinusNode.this.push(row, idx);
- }
-
- @Override public void end() throws Exception {
- MinusNode.this.end(idx);
- }
-
- @Override public void onError(Throwable e) {
- MinusNode.this.onError(e);
- }
- };
- }
-
- /** */
- private void flush() throws Exception {
- if (isClosed())
- return;
-
- checkState();
-
- assert waiting == -1;
-
- int processed = 0;
-
- inLoop = true;
-
- try {
- while (requested > 0 && !grouping.isEmpty()) {
- int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
-
- for (Row row : grouping.getRows(toSnd)) {
- checkState();
-
- requested--;
- downstream().push(row);
-
- processed++;
- }
-
- if (processed >= IN_BUFFER_SIZE && requested > 0) {
- // Allow others to do their job.
- context().execute(this::flush, this::onError);
-
- return;
- }
- }
- }
- finally {
- inLoop = false;
- }
-
- if (requested > 0) {
- requested = 0;
-
- downstream().end();
- }
+ super(ctx, rowType, type, all, rowFactory, new MinusGrouping<>(ctx, rowFactory, type, all));
}
/** */
- private class Grouping {
- /**
- * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
- * keys in all sets except first.
- */
- private final Map<GroupKey, int[]> groups = new HashMap<>();
-
- /** */
- private final RowHandler<Row> hnd;
-
- /** */
- private Grouping() {
- hnd = context().rowHandler();
- }
-
- /** */
- private void add(Row row, int setIdx) {
- if (type == AggregateType.REDUCE) {
- assert setIdx == 0 : "Unexpected set index: " + setIdx;
-
- addOnReducer(row);
- }
- else if (type == AggregateType.MAP)
- addOnMapper(row, setIdx);
- else
- addOnSingle(row, setIdx);
- }
-
- /**
- * @param cnt Number of rows.
- *
- * @return Actually sent rows number.
- */
- private List<Row> getRows(int cnt) {
- if (F.isEmpty(groups))
- return Collections.emptyList();
- else if (type == AggregateType.MAP)
- return getOnMapper(cnt);
- else
- return getOnSingleOrReducer(cnt);
- }
-
+ private static class MinusGrouping<Row> extends Grouping<Row> {
/** */
- private GroupKey key(Row row) {
- int size = hnd.columnCount(row);
-
- Object[] fields = new Object[size];
-
- for (int i = 0; i < size; i++)
- fields[i] = hnd.get(i, row);
-
- return new GroupKey(fields);
+ private MinusGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type, boolean all) {
+ super(ctx, rowFactory, type, all);
}
- /** */
- private void addOnSingle(Row row, int setIdx) {
+ /** {@inheritDoc} */
+ @Override protected void addOnSingle(Row row, int setIdx) {
int[] cntrs;
GroupKey key = key(row);
if (setIdx == 0) {
+ // Value in the map will always have 2 elements, first - count of keys in the first set,
+ // second - count of keys in all sets except first.
cntrs = groups.computeIfAbsent(key, k -> new int[2]);
cntrs[0]++;
}
- else {
+ else if (all) {
cntrs = groups.get(key);
if (cntrs != null) {
@@ -271,97 +63,26 @@ public class MinusNode<Row> extends AbstractNode<Row> {
groups.remove(key);
}
}
+ else
+ groups.remove(key);
}
- /** */
- private void addOnMapper(Row row, int setIdx) {
+ /** {@inheritDoc} */
+ @Override protected void addOnMapper(Row row, int setIdx) {
+ // Value in the map will always have 2 elements, first - count of keys in the first set,
+ // second - count of keys in all sets except first.
int[] cntrs = groups.computeIfAbsent(key(row), k -> new int[2]);
cntrs[setIdx == 0 ? 0 : 1]++;
}
- /** */
- private void addOnReducer(Row row) {
- GroupKey grpKey = (GroupKey)hnd.get(0, row);
-
- int[] cntrs = groups.computeIfAbsent(grpKey, k -> new int[2]);
-
- int[] cntrsMap = (int[])hnd.get(1, row);
-
- for (int i = 0; i < cntrsMap.length; i++)
- cntrs[i] += cntrsMap[i];
- }
-
- /** */
- private List<Row> getOnMapper(int cnt) {
- Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
-
- int amount = Math.min(cnt, groups.size());
- List<Row> res = new ArrayList<>(amount);
-
- while (amount > 0 && it.hasNext()) {
- Map.Entry<GroupKey, int[]> entry = it.next();
-
- // Skip row if it doesn't affect the final result.
- if (entry.getValue()[0] != entry.getValue()[1]) {
- res.add(rowFactory.create(entry.getKey(), entry.getValue()));
-
- amount--;
- }
-
- it.remove();
- }
-
- return res;
- }
-
- /** */
- private List<Row> getOnSingleOrReducer(int cnt) {
- Iterator<Map.Entry<GroupKey, int[]>> it = groups.entrySet().iterator();
-
- List<Row> res = new ArrayList<>(cnt);
-
- while (it.hasNext() && cnt > 0) {
- Map.Entry<GroupKey, int[]> entry = it.next();
-
- GroupKey key = entry.getKey();
-
- Object[] fields = new Object[key.fieldsCount()];
-
- for (int i = 0; i < fields.length; i++)
- fields[i] = key.field(i);
-
- Row row = rowFactory.create(fields);
-
- int[] cntrs = entry.getValue();
-
- int availableRows = availableRows(entry.getValue());
-
- if (availableRows <= cnt) {
- it.remove();
-
- if (availableRows == 0)
- continue;
-
- cnt -= availableRows;
- }
- else {
- availableRows = cnt;
-
- decrementAvailableRows(cntrs, availableRows);
-
- cnt = 0;
- }
-
- for (int i = 0; i < availableRows; i++)
- res.add(row);
- }
-
- return res;
+ /** {@inheritDoc} */
+ @Override protected boolean affectResult(int[] cntrs) {
+ return cntrs[0] != cntrs[1];
}
- /** */
- private int availableRows(int[] cntrs) {
+ /** {@inheritDoc} */
+ @Override protected int availableRows(int[] cntrs) {
assert cntrs.length == 2;
if (all)
@@ -370,17 +91,12 @@ public class MinusNode<Row> extends AbstractNode<Row> {
return cntrs[1] == 0 ? 1 : 0;
}
- /** */
- private void decrementAvailableRows(int[] cntrs, int amount) {
+ /** {@inheritDoc} */
+ @Override protected void decrementAvailableRows(int[] cntrs, int amount) {
assert amount > 0;
assert all;
cntrs[0] -= amount;
}
-
- /** */
- private boolean isEmpty() {
- return groups.isEmpty();
- }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 67c4b23..102d969 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -32,7 +32,7 @@ import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMinusBase;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
@@ -118,9 +118,9 @@ public class IgniteMdRowCount extends RelMdRowCount {
}
/**
- * Estimation of row count for MINUS (EXCEPT) operator.
+ * Estimation of row count for set op (MINUS, INTERSECT).
*/
- public double getRowCount(IgniteMinusBase rel, RelMetadataQuery mq) {
+ public double getRowCount(IgniteSetOp rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index e5f88c4..6241243 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -47,9 +47,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
@@ -230,17 +228,7 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSingleMinus rel) {
- return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteMapMinus rel) {
- return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReduceMinus rel) {
+ @Override public IgniteRel visit(IgniteSetOp rel) {
return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 8f5aec2..6978395 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -47,9 +46,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/** */
@@ -180,28 +177,18 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteRel rel) {
- return rel.accept(this);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteSingleMinus rel) {
+ @Override public IgniteRel visit(IgniteSetOp rel) {
return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteMapMinus rel) {
- return processNode(rel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteReduceMinus rel) {
+ @Override public IgniteRel visit(IgniteTableFunctionScan rel) {
return processNode(rel);
}
/** {@inheritDoc} */
- @Override public IgniteRel visit(IgniteTableFunctionScan rel) {
- return processNode(rel);
+ @Override public IgniteRel visit(IgniteRel rel) {
+ return rel.accept(this);
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 722e8f3..f2a479d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -48,9 +48,9 @@ import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMerge
import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
-import org.apache.ignite.internal.processors.query.calcite.rule.MinusConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.SetOpConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.TableFunctionScanConverterRule;
@@ -144,6 +144,7 @@ public enum PlannerPhase {
CoreRules.UNION_MERGE,
CoreRules.MINUS_MERGE,
+ CoreRules.INTERSECT_MERGE,
CoreRules.UNION_REMOVE,
CoreRules.JOIN_COMMUTE,
CoreRules.AGGREGATE_REMOVE,
@@ -178,8 +179,10 @@ public enum PlannerPhase {
HashAggregateConverterRule.MAP_REDUCE,
SortAggregateConverterRule.SINGLE,
SortAggregateConverterRule.MAP_REDUCE,
- MinusConverterRule.SINGLE,
- MinusConverterRule.MAP_REDUCE,
+ SetOpConverterRule.SINGLE_MINUS,
+ SetOpConverterRule.MAP_REDUCE_MINUS,
+ SetOpConverterRule.SINGLE_INTERSECT,
+ SetOpConverterRule.MAP_REDUCE_INTERSECT,
ProjectConverterRule.INSTANCE,
FilterConverterRule.INSTANCE,
TableModifyConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 95e5f2f..444f254 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -23,9 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceH
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
/**
* A visitor to traverse an Ignite relational nodes tree.
@@ -159,17 +157,7 @@ public interface IgniteRelVisitor<T> {
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
- T visit(IgniteSingleMinus rel);
-
- /**
- * See {@link IgniteRelVisitor#visit(IgniteRel)}
- */
- T visit(IgniteMapMinus rel);
-
- /**
- * See {@link IgniteRelVisitor#visit(IgniteRel)}
- */
- T visit(IgniteReduceMinus rel);
+ T visit(IgniteSetOp rel);
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
index 0e0fb43..aed4e38 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
@@ -23,6 +23,7 @@ import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.metadata.RelColumnMapping;
@@ -31,6 +32,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.ignite.internal.util.typedef.F;
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
/**
* Relational operator for table function scan.
*/
@@ -50,6 +53,15 @@ public class IgniteTableFunctionScan extends TableFunctionScan implements Ignite
super(cluster, traits, ImmutableList.of(), call, null, rowType, null);
}
+ /**
+ * Constructor used for deserialization.
+ *
+ * @param input Serialized representation.
+ */
+ public IgniteTableFunctionScan(RelInput input) {
+ super(changeTraits(input, IgniteConvention.INSTANCE));
+ }
+
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteTableFunctionScan(cluster, getTraitSet(), getCall(), getRowType());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteIntersect.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteIntersect.java
new file mode 100644
index 0000000..3f876a4
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteIntersect.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ * Base class for physical INTERSECT set op.
+ */
+public abstract class IgniteIntersect extends Intersect implements IgniteSetOp {
+ /** */
+ IgniteIntersect(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ protected IgniteIntersect(RelInput input) {
+ super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
+ }
+
+ /** {@inheritDoc} */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ final List<RelNode> inputs = getInputs();
+
+ double rows = mq.getRowCount(inputs.get(0));
+
+ for (int i = 1; i < inputs.size(); i++)
+ rows = 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
+
+ return rows;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return computeSetOpCost(planner, mq);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean all() {
+ return all;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapIntersect.java
similarity index 50%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapIntersect.java
index 0e0fb43..0cc1771 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapIntersect.java
@@ -15,44 +15,45 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rel;
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
-import java.lang.reflect.Type;
import java.util.List;
-import java.util.Set;
-import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableFunctionScan;
-import org.apache.calcite.rel.metadata.RelColumnMapping;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
- * Relational operator for table function scan.
+ * Physical node for MAP phase of INTERSECT operator.
*/
-public class IgniteTableFunctionScan extends TableFunctionScan implements IgniteRel {
- /** Default estimate row count. */
- private static final int ESTIMATE_ROW_COUNT = 100;
-
- /**
- * Creates a TableFunctionScan.
- */
- public IgniteTableFunctionScan(
+public class IgniteMapIntersect extends IgniteIntersect implements IgniteMapSetOp {
+ /** */
+ public IgniteMapIntersect(
RelOptCluster cluster,
- RelTraitSet traits,
- RexNode call,
- RelDataType rowType
+ RelTraitSet traitSet,
+ List<RelNode> inputs,
+ boolean all
) {
- super(cluster, traits, ImmutableList.of(), call, null, rowType, null);
+ super(cluster, traitSet, inputs, all);
+ }
+
+ /** */
+ public IgniteMapIntersect(RelInput input) {
+ super(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteMapIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteMapIntersect(getCluster(), traitSet, inputs, all);
}
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteTableFunctionScan(cluster, getTraitSet(), getCall(), getRowType());
+ return new IgniteMapIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
}
/** {@inheritDoc} */
@@ -61,15 +62,12 @@ public class IgniteTableFunctionScan extends TableFunctionScan implements Ignite
}
/** {@inheritDoc} */
- @Override public TableFunctionScan copy(RelTraitSet traitSet, List<RelNode> inputs, RexNode rexCall,
- Type elementType, RelDataType rowType, Set<RelColumnMapping> columnMappings) {
- assert F.isEmpty(inputs);
-
- return this;
+ @Override protected RelDataType deriveRowType() {
+ return buildRowType();
}
/** {@inheritDoc} */
- @Override public double estimateRowCount(RelMetadataQuery mq) {
- return ESTIMATE_ROW_COUNT;
+ @Override public int aggregateFieldsCount() {
+ return getInput(0).getRowType().getFieldCount() + getInputs().size();
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
index 061cc1c..9bbfa5b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
@@ -18,31 +18,19 @@
package org.apache.ignite.internal.processors.query.calcite.rel.set;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Physical node for MAP phase of MINUS (EXCEPT) operator.
*/
-public class IgniteMapMinus extends IgniteMinusBase {
+public class IgniteMapMinus extends IgniteMinus implements IgniteMapSetOp {
/** */
public IgniteMapMinus(
RelOptCluster cluster,
@@ -75,67 +63,11 @@ public class IgniteMapMinus extends IgniteMinusBase {
/** {@inheritDoc} */
@Override protected RelDataType deriveRowType() {
- RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
-
- assert typeFactory instanceof IgniteTypeFactory;
-
- RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
-
- builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
- builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
-
- return builder.build();
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- boolean rewindable = inputTraits.stream()
- .map(TraitUtils::rewindability)
- .allMatch(RewindabilityTrait::rewindable);
-
- if (rewindable)
- return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
- Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
- return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
-
- if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
- return ImmutableList.of(); // Mixing of single and random is prohibited.
-
- return ImmutableList.of(
- Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
- t -> t.replace(IgniteDistributions.random())))
- );
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits
- ) {
- Set<CorrelationId> correlationIds = inTraits.stream()
- .map(TraitUtils::correlation)
- .flatMap(corrTr -> corrTr.correlationIds().stream())
- .collect(Collectors.toSet());
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
- inTraits));
+ return buildRowType();
}
/** {@inheritDoc} */
- @Override protected int aggregateFieldsCount() {
+ @Override public int aggregateFieldsCount() {
return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
similarity index 66%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
index 061cc1c..f38ec09 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
@@ -21,17 +21,13 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -40,55 +36,11 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
- * Physical node for MAP phase of MINUS (EXCEPT) operator.
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
*/
-public class IgniteMapMinus extends IgniteMinusBase {
- /** */
- public IgniteMapMinus(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- List<RelNode> inputs,
- boolean all
- ) {
- super(cluster, traitSet, inputs, all);
- }
-
- /** */
- public IgniteMapMinus(RelInput input) {
- super(input);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- /** {@inheritDoc} */
- @Override protected RelDataType deriveRowType() {
- RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
-
- assert typeFactory instanceof IgniteTypeFactory;
-
- RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
-
- builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
- builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
-
- return builder.build();
- }
-
+public interface IgniteMapSetOp extends IgniteSetOp {
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
) {
@@ -104,24 +56,24 @@ public class IgniteMapMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
) {
if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
- if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
- return ImmutableList.of(); // Mixing of single and random is prohibited.
-
return ImmutableList.of(
Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
- t -> t.replace(IgniteDistributions.random())))
+ t -> TraitUtils.distribution(t) == IgniteDistributions.broadcast() ?
+ // Allow broadcast with trim-exchange to be used in map-reduce set-op.
+ t.replace(IgniteDistributions.hash(ImmutableList.of(0))) :
+ t.replace(IgniteDistributions.random())))
);
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
RelTraitSet nodeTraits,
List<RelTraitSet> inTraits
) {
@@ -134,8 +86,22 @@ public class IgniteMapMinus extends IgniteMinusBase {
inTraits));
}
+ /** Build RowType for MAP node. */
+ public default RelDataType buildRowType() {
+ RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
+
+ assert typeFactory instanceof IgniteTypeFactory;
+
+ RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+ builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
+ builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
+
+ return builder.build();
+ }
+
/** {@inheritDoc} */
- @Override protected int aggregateFieldsCount() {
- return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+ @Override public default AggregateType aggregateType() {
+ return AggregateType.MAP;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinus.java
similarity index 53%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinus.java
index b2cf77b..1a57a4b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinus.java
@@ -18,59 +18,35 @@
package org.apache.ignite.internal.processors.query.calcite.rel.set;
import java.util.List;
-import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Base class for physical MINUS (EXCEPT) set op.
*/
-public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
+public abstract class IgniteMinus extends Minus implements IgniteSetOp {
/** Count of counter fields used to aggregate results. */
protected static final int COUNTER_FIELDS_CNT = 2;
/** */
- IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ IgniteMinus(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
super(cluster, traits, inputs, all);
}
/** {@inheritDoc} */
- protected IgniteMinusBase(RelInput input) {
+ protected IgniteMinus(RelInput input) {
super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
}
/** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Operation erases collation.
- return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // Operation erases collation.
- return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
- }
-
- /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
- protected abstract int aggregateFieldsCount();
-
- /** {@inheritDoc} */
@Override public double estimateRowCount(RelMetadataQuery mq) {
final List<RelNode> inputs = getInputs();
@@ -84,17 +60,11 @@ public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgnite
/** {@inheritDoc} */
@Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
-
- double rows = estimateRowCount(mq);
-
- double inputRows = 0;
-
- for (RelNode input : getInputs())
- inputRows += mq.getRowCount(input);
-
- double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+ return computeSetOpCost(planner, mq);
+ }
- return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+ /** {@inheritDoc} */
+ @Override public boolean all() {
+ return all;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
similarity index 50%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
index c068e8e..63a5765 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
@@ -27,22 +27,16 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
- * Physical node for REDUCE phase of MINUS (EXCEPT) operator.
+ * Physical node for REDUCE phase of INTERSECT operator.
*/
-public class IgniteReduceMinus extends IgniteMinusBase {
+public class IgniteReduceIntersect extends IgniteIntersect implements IgniteReduceSetOp {
/** */
- public IgniteReduceMinus(
+ public IgniteReduceIntersect(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
@@ -55,7 +49,7 @@ public class IgniteReduceMinus extends IgniteMinusBase {
}
/** */
- public IgniteReduceMinus(RelInput input) {
+ public IgniteReduceIntersect(RelInput input) {
this(
input.getCluster(),
input.getTraitSet().replace(IgniteConvention.INSTANCE),
@@ -74,53 +68,13 @@ public class IgniteReduceMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- return ImmutableList.of(
- Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits) {
- IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
- if (IgniteDistributions.single().satisfies(distr)) {
- return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
- Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
- ImmutableList.of(sole(inputTraits).replace(IgniteDistributions.single()))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits
- ) {
- return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
- inTraits));
- }
-
- /** {@inheritDoc} */
@Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new IgniteReduceMinus(getCluster(), traitSet, sole(inputs), all, rowType);
+ return new IgniteReduceIntersect(getCluster(), traitSet, sole(inputs), all, rowType);
}
/** {@inheritDoc} */
- @Override public IgniteReduceMinus clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteReduceMinus(cluster, getTraitSet(), sole(inputs), all, rowType);
+ @Override public IgniteReduceIntersect clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteReduceIntersect(cluster, getTraitSet(), sole(inputs), all, rowType);
}
/** {@inheritDoc} */
@@ -129,7 +83,7 @@ public class IgniteReduceMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override protected int aggregateFieldsCount() {
- return rowType.getFieldCount() + COUNTER_FIELDS_CNT;
+ @Override public int aggregateFieldsCount() {
+ return rowType.getFieldCount() + 2 /* At least two fields required for count aggregation. */;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
index c068e8e..63ca97e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceMinus.java
@@ -27,20 +27,14 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Physical node for REDUCE phase of MINUS (EXCEPT) operator.
*/
-public class IgniteReduceMinus extends IgniteMinusBase {
+public class IgniteReduceMinus extends IgniteMinus implements IgniteReduceSetOp {
/** */
public IgniteReduceMinus(
RelOptCluster cluster,
@@ -74,46 +68,6 @@ public class IgniteReduceMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- return ImmutableList.of(
- Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits) {
- IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
- if (IgniteDistributions.single().satisfies(distr)) {
- return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
- Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
- ImmutableList.of(sole(inputTraits).replace(IgniteDistributions.single()))));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits
- ) {
- return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
- inTraits));
- }
-
- /** {@inheritDoc} */
@Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
return new IgniteReduceMinus(getCluster(), traitSet, sole(inputs), all, rowType);
}
@@ -129,7 +83,7 @@ public class IgniteReduceMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override protected int aggregateFieldsCount() {
+ @Override public int aggregateFieldsCount() {
return rowType.getFieldCount() + COUNTER_FIELDS_CNT;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceSetOp.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceSetOp.java
new file mode 100644
index 0000000..c618581
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceSetOp.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for REDUCE phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteReduceSetOp extends IgniteSetOp {
+ /** {@inheritDoc} */
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(
+ Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY), ImmutableList.of(inputTraits.get(0))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()),
+ ImmutableList.of(inputTraits.get(0).replace(IgniteDistributions.single()))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ return ImmutableList.of(Pair.of(nodeTraits.replace(TraitUtils.correlation(inTraits.get(0))),
+ inTraits));
+ }
+
+ /** {@inheritDoc} */
+ @Override public default AggregateType aggregateType() {
+ return AggregateType.REDUCE;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
similarity index 58%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
index b2cf77b..e085c43 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMinusBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
@@ -19,75 +19,49 @@ package org.apache.ignite.internal.processors.query.calcite.rel.set;
import java.util.List;
import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
- * Base class for physical MINUS (EXCEPT) set op.
+ * Base interface for physical set op node (MINUS, INTERSECT).
*/
-public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
- /** Count of counter fields used to aggregate results. */
- protected static final int COUNTER_FIELDS_CNT = 2;
-
- /** */
- IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
- super(cluster, traits, inputs, all);
- }
+public interface IgniteSetOp extends TraitsAwareIgniteRel {
+ /** ALL flag of set op. */
+ public boolean all();
/** {@inheritDoc} */
- protected IgniteMinusBase(RelInput input) {
- super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits) {
// Operation erases collation.
return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits) {
// Operation erases collation.
return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
}
/** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
- protected abstract int aggregateFieldsCount();
-
- /** {@inheritDoc} */
- @Override public double estimateRowCount(RelMetadataQuery mq) {
- final List<RelNode> inputs = getInputs();
+ public int aggregateFieldsCount();
- double rows = mq.getRowCount(inputs.get(0));
-
- for (int i = 1; i < inputs.size(); i++)
- rows -= 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
-
- return rows;
- }
-
- /** {@inheritDoc} */
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ /** Compute cost for set op. */
+ public default RelOptCost computeSetOpCost(RelOptPlanner planner, RelMetadataQuery mq) {
IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
- double rows = estimateRowCount(mq);
-
double inputRows = 0;
for (RelNode input : getInputs())
@@ -95,6 +69,9 @@ public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgnite
double mem = 0.5 * inputRows * aggregateFieldsCount() * IgniteCost.AVERAGE_FIELD_SIZE;
- return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
+ return costFactory.makeCost(inputRows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
}
+
+ /** Aggregate type. */
+ public AggregateType aggregateType();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleIntersect.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleIntersect.java
new file mode 100644
index 0000000..1225146
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleIntersect.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for INTERSECT operator which inputs satisfy SINGLE distribution.
+ */
+public class IgniteSingleIntersect extends IgniteIntersect implements IgniteSingleSetOp {
+ /** {@inheritDoc} */
+ public IgniteSingleIntersect(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelNode> inputs,
+ boolean all
+ ) {
+ super(cluster, traitSet, inputs, all);
+ }
+
+ /** */
+ public IgniteSingleIntersect(RelInput input) {
+ super(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteSingleIntersect copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new IgniteSingleIntersect(getCluster(), traitSet, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteSingleIntersect(cluster, getTraitSet(), Commons.cast(inputs), all);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int aggregateFieldsCount() {
+ return getInput(0).getRowType().getFieldCount() + getInputs().size();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
index 3165bb5..3f83a2a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
@@ -18,28 +18,18 @@
package org.apache.ignite.internal.processors.query.calcite.rel.set;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
*/
-public class IgniteSingleMinus extends IgniteMinusBase {
+public class IgniteSingleMinus extends IgniteMinus implements IgniteSingleSetOp {
/** {@inheritDoc} */
public IgniteSingleMinus(
RelOptCluster cluster,
@@ -56,63 +46,6 @@ public class IgniteSingleMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- boolean rewindable = inputTraits.stream()
- .map(TraitUtils::rewindability)
- .allMatch(RewindabilityTrait::rewindable);
-
- if (rewindable)
- return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
- Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
- }
-
- /** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
- IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
- if (IgniteDistributions.single().satisfies(distr)) {
- return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
- Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inputTraits
- ) {
- boolean single = inputTraits.stream()
- .map(TraitUtils::distribution)
- .allMatch(d -> d.satisfies(IgniteDistributions.single()));
-
- if (!single)
- return ImmutableList.of();
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(IgniteDistributions.single()), inputTraits));
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
- RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits
- ) {
- Set<CorrelationId> correlationIds = inTraits.stream()
- .map(TraitUtils::correlation)
- .flatMap(corrTr -> corrTr.correlationIds().stream())
- .collect(Collectors.toSet());
-
- return ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(correlationIds)),
- inTraits));
- }
-
- /** {@inheritDoc} */
@Override public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
}
@@ -128,7 +61,7 @@ public class IgniteSingleMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override protected int aggregateFieldsCount() {
+ @Override public int aggregateFieldsCount() {
return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleSetOp.java
similarity index 58%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleSetOp.java
index 3165bb5..dc20a70 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleMinus.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSingleSetOp.java
@@ -21,42 +21,22 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
- * Physical node for MINUS (EXCEPT) operator which inputs satisfy SINGLE distribution.
+ * Physical node for set op (MINUS, INTERSECT) operator which inputs satisfy SINGLE distribution.
*/
-public class IgniteSingleMinus extends IgniteMinusBase {
+public interface IgniteSingleSetOp extends IgniteSetOp {
/** {@inheritDoc} */
- public IgniteSingleMinus(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- List<RelNode> inputs,
- boolean all
- ) {
- super(cluster, traitSet, inputs, all);
- }
-
- /** */
- public IgniteSingleMinus(RelInput input) {
- super(input);
- }
-
- /** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
) {
@@ -72,19 +52,16 @@ public class IgniteSingleMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
- IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
-
- if (IgniteDistributions.single().satisfies(distr)) {
- return Pair.of(nodeTraits.replace(IgniteDistributions.single()),
- Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
- }
+ @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (TraitUtils.distribution(nodeTraits) == IgniteDistributions.single())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t -> t.replace(IgniteDistributions.single())));
return null;
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
) {
@@ -99,7 +76,7 @@ public class IgniteSingleMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
+ @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(
RelTraitSet nodeTraits,
List<RelTraitSet> inTraits
) {
@@ -113,22 +90,7 @@ public class IgniteSingleMinus extends IgniteMinusBase {
}
/** {@inheritDoc} */
- @Override public IgniteSingleMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new IgniteSingleMinus(getCluster(), traitSet, inputs, all);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteSingleMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- /** {@inheritDoc} */
- @Override protected int aggregateFieldsCount() {
- return getInput(0).getRowType().getFieldCount() + COUNTER_FIELDS_CNT;
+ @Override public default AggregateType aggregateType() {
+ return AggregateType.SINGLE;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
deleted file mode 100644
index 7e11c0c..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/MinusConverterRule.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.logical.LogicalMinus;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-
-/**
- * MINUS (EXCEPT) operation converter rule.
- */
-public class MinusConverterRule {
- /** */
- public static final RelOptRule SINGLE = new SingleMinusConverterRule();
-
- /** */
- public static final RelOptRule MAP_REDUCE = new MapReduceMinusConverterRule();
-
- /** */
- private MinusConverterRule() {
- // No-op.
- }
-
- /** */
- private static class SingleMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
- /** */
- SingleMinusConverterRule() {
- super(LogicalMinus.class, "SingleMinusConverterRule");
- }
-
- /** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
- RelOptCluster cluster = setOp.getCluster();
- RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
- RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
- List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
-
- return new IgniteSingleMinus(cluster, outTrait, inputs, setOp.all);
- }
- }
-
- /** */
- private static class MapReduceMinusConverterRule extends AbstractIgniteConverterRule<LogicalMinus> {
- /** */
- MapReduceMinusConverterRule() {
- super(LogicalMinus.class, "MapReduceMinusConverterRule");
- }
-
- /** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalMinus setOp) {
- RelOptCluster cluster = setOp.getCluster();
- RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
- List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
-
- RelNode map = new IgniteMapMinus(cluster, outTrait, inputs, setOp.all);
-
- return new IgniteReduceMinus(
- cluster,
- outTrait.replace(IgniteDistributions.single()),
- convert(map, inTrait.replace(IgniteDistributions.single())),
- setOp.all,
- cluster.getTypeFactory().leastRestrictive(Util.transform(inputs, RelNode::getRowType))
- );
- }
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java
new file mode 100644
index 0000000..980685c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SetOpConverterRule.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapIntersect;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceIntersect;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleIntersect;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+
+/**
+ * Set op (MINUS, INTERSECT) converter rule.
+ */
+public class SetOpConverterRule {
+ /** */
+ public static final RelOptRule SINGLE_MINUS = new SingleMinusConverterRule();
+
+ /** */
+ public static final RelOptRule SINGLE_INTERSECT = new SingleIntersectConverterRule();
+
+ /** */
+ public static final RelOptRule MAP_REDUCE_MINUS = new MapReduceMinusConverterRule();
+
+ /** */
+ public static final RelOptRule MAP_REDUCE_INTERSECT = new MapReduceIntersectConverterRule();
+
+ /** */
+ private SetOpConverterRule() {
+ // No-op.
+ }
+
+ /** */
+ private abstract static class SingleSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
+ /** */
+ SingleSetOpConverterRule(Class<T> cls, String desc) {
+ super(cls, desc);
+ }
+
+ /** Node factory method. */
+ abstract PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all);
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T setOp) {
+ RelOptCluster cluster = setOp.getCluster();
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+ List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+ return createNode(cluster, outTrait, inputs, setOp.all);
+ }
+ }
+
+ /** */
+ private static class SingleMinusConverterRule extends SingleSetOpConverterRule<LogicalMinus> {
+ /** */
+ SingleMinusConverterRule() {
+ super(LogicalMinus.class, "SingleMinusConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ return new IgniteSingleMinus(cluster, traits, inputs, all);
+ }
+ }
+
+ /** */
+ private static class SingleIntersectConverterRule extends SingleSetOpConverterRule<LogicalIntersect> {
+ /** */
+ SingleIntersectConverterRule() {
+ super(LogicalIntersect.class, "SingleIntersectConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override PhysicalNode createNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ return new IgniteSingleIntersect(cluster, traits, inputs, all);
+ }
+ }
+
+ /** */
+ private abstract static class MapReduceSetOpConverterRule<T extends SetOp> extends AbstractIgniteConverterRule<T> {
+ /** */
+ MapReduceSetOpConverterRule(Class<T> cls, String desc) {
+ super(cls, desc);
+ }
+
+ /** Map node factory method. */
+ abstract PhysicalNode createMapNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all);
+
+ /** Reduce node factory method. */
+ abstract PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ boolean all, RelDataType rowType);
+
+ /** {@inheritDoc} */
+ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T setOp) {
+ RelOptCluster cluster = setOp.getCluster();
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ List<RelNode> inputs = Util.transform(setOp.getInputs(), rel -> convert(rel, inTrait));
+
+ RelNode map = createMapNode(cluster, outTrait, inputs, setOp.all);
+
+ return createReduceNode(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(map, inTrait.replace(IgniteDistributions.single())),
+ setOp.all,
+ cluster.getTypeFactory().leastRestrictive(Util.transform(inputs, RelNode::getRowType))
+ );
+ }
+ }
+
+ /** */
+ private static class MapReduceMinusConverterRule extends MapReduceSetOpConverterRule<LogicalMinus> {
+ /** */
+ MapReduceMinusConverterRule() {
+ super(LogicalMinus.class, "MapReduceMinusConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override PhysicalNode createMapNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ return new IgniteMapMinus(cluster, traits, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ @Override PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits, RelNode input, boolean all,
+ RelDataType rowType) {
+ return new IgniteReduceMinus(cluster, traits, input, all, rowType);
+ }
+ }
+
+ /** */
+ private static class MapReduceIntersectConverterRule extends MapReduceSetOpConverterRule<LogicalIntersect> {
+ /** */
+ MapReduceIntersectConverterRule() {
+ super(LogicalIntersect.class, "MapReduceIntersectConverterRule");
+ }
+
+ /** {@inheritDoc} */
+ @Override PhysicalNode createMapNode(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ return new IgniteMapIntersect(cluster, traits, inputs, all);
+ }
+
+ /** {@inheritDoc} */
+ @Override PhysicalNode createReduceNode(RelOptCluster cluster, RelTraitSet traits, RelNode input, boolean all,
+ RelDataType rowType) {
+ return new IgniteReduceIntersect(cluster, traits, input, all, rowType);
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index d2aa509..84aa2f0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -25,14 +25,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-
-import javax.cache.Cache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
@@ -484,25 +481,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
awaitPartitionMapExchange(true, true, null);
}
- /** Copy cache with it's content to new replicated cache. */
- private void copyCacheAsReplicated(String cacheName) throws InterruptedException {
- IgniteCache<Object, Object> cache = client.cache(cacheName);
-
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<Object, Object>(
- cache.getConfiguration(CacheConfiguration.class));
-
- ccfg.setName(cacheName + "Replicated");
- ccfg.setCacheMode(CacheMode.REPLICATED);
- ccfg.getQueryEntities().forEach(qe -> qe.setTableName(qe.getTableName() + "_repl"));
-
- IgniteCache<Object, Object> replCache = client.getOrCreateCache(ccfg);
-
- for (Cache.Entry<?, ?> entry : cache)
- replCache.put(entry.getKey(), entry.getValue());
-
- awaitPartitionMapExchange(true, true, null);
- }
-
/** */
@Test
public void testOrderingByColumnOutsideSelectList() throws InterruptedException {
@@ -701,207 +679,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEquals(1, rows.size());
}
- /** */
- @Test
- public void testExcept() throws Exception {
- populateTables();
-
- List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT SELECT name from Account");
-
- assertEquals(1, rows.size());
- assertEquals("Igor", rows.get(0).get(0));
- }
-
- /** */
- @Test
- public void testExceptFromEmpty() throws Exception {
- populateTables();
-
- copyCacheAsReplicated("orders");
- copyCacheAsReplicated("account");
-
- List<List<?>> rows = sql("SELECT name FROM Orders WHERE salary < 0 EXCEPT SELECT name FROM Account");
-
- assertEquals(0, rows.size());
-
- rows = sql("SELECT name FROM Orders_repl WHERE salary < 0 EXCEPT SELECT name FROM Account_repl");
-
- assertEquals(0, rows.size());
- }
-
- /** */
- @Test
- public void testExceptSeveralColumns() throws Exception {
- populateTables();
-
- List<List<?>> rows = sql("SELECT name, salary FROM Orders EXCEPT SELECT name, salary from Account");
-
- assertEquals(4, rows.size());
- assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
- }
-
- /** */
- @Test
- public void testExceptAll() throws Exception {
- populateTables();
-
- List<List<?>> rows = sql("SELECT name FROM Orders EXCEPT ALL SELECT name from Account");
-
- assertEquals(4, rows.size());
- assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
- }
-
- /** */
- @Test
- public void testExceptNested() throws Exception {
- populateTables();
-
- List<List<?>> rows =
- sql("SELECT name FROM Orders EXCEPT (SELECT name FROM Orders EXCEPT SELECT name from Account)");
-
- assertEquals(2, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
- }
-
- /** */
- @Test
- public void testExceptReplicatedWithPartitioned() throws Exception {
- populateTables();
- copyCacheAsReplicated("orders");
-
- List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account");
-
- assertEquals(4, rows.size());
- assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
- }
-
- /** */
- @Test
- public void testExceptReplicated() throws Exception {
- populateTables();
- copyCacheAsReplicated("orders");
- copyCacheAsReplicated("account");
-
- List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name from Account_repl");
-
- assertEquals(4, rows.size());
- assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
- }
-
- /** */
- @Test
- public void testExceptMerge() throws Exception {
- populateTables();
- copyCacheAsReplicated("orders");
-
- List<List<?>> rows = sql("SELECT name FROM Orders_repl EXCEPT ALL SELECT name FROM Account EXCEPT ALL " +
- "SELECT name FROM orders WHERE salary < 11");
-
- assertEquals(3, rows.size());
- assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor")));
- assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
- }
-
- /** */
- @Test
- public void testExceptBigBatch() throws Exception {
- client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
- .setName("cache1")
- .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table1")))
- .setBackups(2)
- );
-
- client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
- .setName("cache2")
- .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table2")))
- .setBackups(1)
- );
-
- copyCacheAsReplicated("cache1");
- copyCacheAsReplicated("cache2");
-
- try (IgniteDataStreamer<Integer, Integer> ds1 = client.dataStreamer("cache1");
- IgniteDataStreamer<Integer, Integer> ds2 = client.dataStreamer("cache2");
- IgniteDataStreamer<Integer, Integer> ds3 = client.dataStreamer("cache1Replicated");
- IgniteDataStreamer<Integer, Integer> ds4 = client.dataStreamer("cache2Replicated")
- ) {
- int key = 0;
-
- for (int i = 0; i < 5; i++) {
- for (int j = 0; j < ((i == 0) ? 1 : (1 << (i * 4 - 1))); j++) {
- // Cache1 keys count: 1 of "0", 8 of "1", 128 of "2", 2048 of "3", 32768 of "4".
- ds1.addData(key++, i);
- ds3.addData(key++, i);
-
- // Cache2 keys count: 1 of "5", 128 of "3", 32768 of "1".
- if ((i & 1) == 0) {
- ds2.addData(key++, 5 - i);
- ds4.addData(key++, 5 - i);
- }
- }
- }
- }
-
- awaitPartitionMapExchange(true, true, null);
-
- List<List<?>> rows;
-
- // Check 2 partitioned caches.
- rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT SELECT _val FROM \"cache2\".table2");
-
- assertEquals(3, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
- assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
- assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
-
- rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT ALL SELECT _val FROM \"cache2\".table2");
-
- assertEquals(34817, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
- assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
- assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
- assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
-
- // Check 1 replicated and 1 partitioned caches.
- rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2\".table2");
-
- assertEquals(3, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
- assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
- assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
-
- rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2\".table2");
-
- assertEquals(34817, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
- assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
- assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
- assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
-
- // Check 2 replicated caches.
- rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2Replicated\"" +
- ".table2_repl");
-
- assertEquals(3, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
- assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
- assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
-
- rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2Replicated\"" +
- ".table2_repl");
-
- assertEquals(34817, rows.size());
- assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
- assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
- assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
- assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
- }
-
/**
* Execute SQL statement on given node.
*
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java
similarity index 66%
copy from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
copy to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java
index bb7b32c..938071c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java
@@ -22,12 +22,14 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -40,9 +42,9 @@ import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.A
import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
/**
- * Test execution of MINUS (EXCEPT) operator.
+ * Abstract test for set operator (MINUS, INTERSECT) execution.
*/
-public class MinusExecutionTest extends AbstractExecutionTest {
+public abstract class AbstractSetOpExecutionTest extends AbstractExecutionTest {
/**
* @throws Exception If failed.
*/
@@ -55,25 +57,25 @@ public class MinusExecutionTest extends AbstractExecutionTest {
/** */
@Test
public void testSingle() {
- checkMinus(true, false);
+ checkSetOp(true, false);
}
/** */
@Test
public void testSingleAll() {
- checkMinus(true, true);
+ checkSetOp(true, true);
}
/** */
@Test
public void testMapReduce() {
- checkMinus(false, false);
+ checkSetOp(false, false);
}
/** */
@Test
public void testMapReduceAll() {
- checkMinus(false, true);
+ checkSetOp(false, true);
}
/** */
@@ -88,11 +90,12 @@ public class MinusExecutionTest extends AbstractExecutionTest {
row("Roman", 1)
);
- // For "single minus" operation, node should not request rows from the next source if result after the previous
- // source is already empty.
+ // For single distribution set operations, node should not request rows from the next source if result after
+ // the previous source is already empty.
ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, data);
ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, data);
- Node<Object[]> node3 = new AbstractNode<Object[]>(ctx, rowType) {
+ ScanNode<Object[]> scan3 = new ScanNode<>(ctx, rowType, Collections.emptyList());
+ Node<Object[]> node4 = new AbstractNode<Object[]>(ctx, rowType) {
@Override protected void rewindInternal() {
// No-op.
}
@@ -106,101 +109,71 @@ public class MinusExecutionTest extends AbstractExecutionTest {
}
};
- MinusNode<Object[]> minusNode = new MinusNode<>(ctx, rowType, SINGLE, false, rowFactory());
- minusNode.register(Arrays.asList(scan1, scan2, node3));
+ List<Node<Object[]>> inputs = Arrays.asList(scan1, scan2, scan3, node4);
+
+ AbstractSetOpNode<Object[]> setOpNode = setOpNodeFactory(ctx, rowType, SINGLE, false, inputs.size());
+ setOpNode.register(inputs);
RootNode<Object[]> root = new RootNode<>(ctx, rowType);
- root.register(minusNode);
+ root.register(setOpNode);
assertFalse(root.hasNext());
}
+ /** */
+ protected abstract void checkSetOp(boolean single, boolean all);
+
/**
* @param single Single.
* @param all All.
*/
- private void checkMinus(boolean single, boolean all) {
+ protected void checkSetOp(boolean single, boolean all, List<List<Object[]>> dataSets, List<Object[]> expectedResult) {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
- ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 1),
- row("Roman", 1),
- row("Igor", 1),
- row("Roman", 2),
- row("Igor", 1),
- row("Igor", 1),
- row("Igor", 2)
- ));
+ List<Node<Object[]>> inputs = dataSets.stream().map(ds -> new ScanNode<>(ctx, rowType, ds))
+ .collect(Collectors.toList());
- ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
- row("Igor", 1),
- row("Roman", 1),
- row("Igor", 1),
- row("Alexey", 1)
- ));
-
- MinusNode<Object[]> minusNode;
-
- if (single) {
- minusNode = new MinusNode<>(
- ctx,
- rowType,
- SINGLE,
- all,
- rowFactory()
- );
- }
- else {
- minusNode = new MinusNode<>(
- ctx,
- rowType,
- MAP,
- all,
- rowFactory()
- );
- }
+ AbstractSetOpNode<Object[]> setOpNode;
+
+ if (single)
+ setOpNode = setOpNodeFactory(ctx, rowType, SINGLE, all, inputs.size());
+ else
+ setOpNode = setOpNodeFactory(ctx, rowType, MAP, all, inputs.size());
- minusNode.register(Arrays.asList(scan1, scan2));
+ setOpNode.register(inputs);
if (!single) {
- MinusNode<Object[]> reduceNode = new MinusNode<>(
- ctx,
- rowType,
- REDUCE,
- all,
- rowFactory()
- );
+ AbstractSetOpNode<Object[]> reduceNode = setOpNodeFactory(ctx, rowType, REDUCE, all, 1);
- reduceNode.register(Collections.singletonList(minusNode));
+ reduceNode.register(Collections.singletonList(setOpNode));
- minusNode = reduceNode;
+ setOpNode = reduceNode;
}
Comparator<Object[]> cmp = ctx.expressionFactory().comparator(RelCollations.of(ImmutableIntList.of(0, 1)));
// Create sort node on the top to check sorted results.
SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, cmp);
- sortNode.register(minusNode);
+ sortNode.register(setOpNode);
RootNode<Object[]> root = new RootNode<>(ctx, rowType);
root.register(sortNode);
- assertTrue(root.hasNext());
+ assertTrue(F.isEmpty(expectedResult) || root.hasNext());
- if (all) {
- Assert.assertArrayEquals(row("Igor", 1), root.next());
- Assert.assertArrayEquals(row("Igor", 1), root.next());
- }
-
- Assert.assertArrayEquals(row("Igor", 2), root.next());
- Assert.assertArrayEquals(row("Roman", 2), root.next());
+ for (Object[] row : expectedResult)
+ Assert.assertArrayEquals(row, root.next());
assertFalse(root.hasNext());
}
/** */
+ protected abstract AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx, RelDataType rowType,
+ AggregateType type, boolean all, int inputsCnt);
+
+ /** */
protected RowHandler.RowFactory<Object[]> rowFactory() {
return new RowHandler.RowFactory<Object[]>() {
/** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java
new file mode 100644
index 0000000..31e6169
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+
+/**
+ * Test execution of INTERSECT operator.
+ */
+public class IntersectExecutionTest extends AbstractSetOpExecutionTest {
+ /** {@inheritDoc} */
+ @Override protected AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx,
+ RelDataType rowType, AggregateType type, boolean all, int inputsCnt) {
+ return new IntersectNode<>(ctx, rowType, type, all, rowFactory(), inputsCnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkSetOp(boolean single, boolean all) {
+ List<Object[]> ds1 = Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1),
+ row("Igor", 1),
+ row("Roman", 2),
+ row("Igor", 1),
+ row("Igor", 1),
+ row("Igor", 2)
+ );
+
+ List<Object[]> ds2 = Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1),
+ row("Igor", 1),
+ row("Igor", 1),
+ row("Alexey", 1)
+ );
+
+ List<Object[]> ds3 = Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1),
+ row("Igor", 1),
+ row("Roman", 2),
+ row("Alexey", 2)
+ );
+
+ List<Object[]> expectedResult;
+
+ if (all) {
+ expectedResult = Arrays.asList(
+ row("Igor", 1),
+ row("Igor", 1),
+ row("Roman", 1)
+ );
+ }
+ else {
+ expectedResult = Arrays.asList(
+ row("Igor", 1),
+ row("Roman", 1)
+ );
+ }
+
+ checkSetOp(single, all, Arrays.asList(ds1, ds2, ds3), expectedResult);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
index bb7b32c..8708c98 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java
@@ -18,205 +18,65 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
-import java.util.UUID;
-import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.MAP;
-import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.REDUCE;
-import static org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType.SINGLE;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
/**
* Test execution of MINUS (EXCEPT) operator.
*/
-public class MinusExecutionTest extends AbstractExecutionTest {
- /**
- * @throws Exception If failed.
- */
- @Before
- @Override public void setup() throws Exception {
- nodesCnt = 1;
- super.setup();
- }
-
- /** */
- @Test
- public void testSingle() {
- checkMinus(true, false);
- }
-
- /** */
- @Test
- public void testSingleAll() {
- checkMinus(true, true);
- }
-
- /** */
- @Test
- public void testMapReduce() {
- checkMinus(false, false);
- }
-
- /** */
- @Test
- public void testMapReduceAll() {
- checkMinus(false, true);
+public class MinusExecutionTest extends AbstractSetOpExecutionTest {
+ /** {@inheritDoc} */
+ @Override protected AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx,
+ RelDataType rowType, AggregateType type, boolean all, int inputsCnt) {
+ return new MinusNode<>(ctx, rowType, type, all, rowFactory());
}
- /** */
- @Test
- public void testSingleWithEmptySet() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-
- List<Object[]> data = Arrays.asList(
- row("Igor", 1),
- row("Roman", 1)
- );
-
- // For "single minus" operation, node should not request rows from the next source if result after the previous
- // source is already empty.
- ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, data);
- ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, data);
- Node<Object[]> node3 = new AbstractNode<Object[]>(ctx, rowType) {
- @Override protected void rewindInternal() {
- // No-op.
- }
-
- @Override protected Downstream<Object[]> requestDownstream(int idx) {
- return null;
- }
-
- @Override public void request(int rowsCnt) throws Exception {
- fail("Node should not be requested");
- }
- };
-
- MinusNode<Object[]> minusNode = new MinusNode<>(ctx, rowType, SINGLE, false, rowFactory());
- minusNode.register(Arrays.asList(scan1, scan2, node3));
-
- RootNode<Object[]> root = new RootNode<>(ctx, rowType);
- root.register(minusNode);
-
- assertFalse(root.hasNext());
- }
-
- /**
- * @param single Single.
- * @param all All.
- */
- private void checkMinus(boolean single, boolean all) {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, String.class, int.class);
-
- ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
+ /** {@inheritDoc} */
+ @Override protected void checkSetOp(boolean single, boolean all) {
+ List<Object[]> ds1 = Arrays.asList(
row("Igor", 1),
row("Roman", 1),
row("Igor", 1),
row("Roman", 2),
row("Igor", 1),
row("Igor", 1),
- row("Igor", 2)
- ));
+ row("Igor", 1),
+ row("Igor", 2),
+ row("Alexey", 2)
+ );
- ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
+ List<Object[]> ds2 = Arrays.asList(
row("Igor", 1),
row("Roman", 1),
row("Igor", 1),
row("Alexey", 1)
- ));
+ );
+
+ List<Object[]> ds3 = Arrays.asList(
+ row("Igor", 1),
+ row("Alexey", 1),
+ row("Alexey", 2)
+ );
- MinusNode<Object[]> minusNode;
+ List<Object[]> expectedResult;
- if (single) {
- minusNode = new MinusNode<>(
- ctx,
- rowType,
- SINGLE,
- all,
- rowFactory()
+ if (all) {
+ expectedResult = Arrays.asList(
+ row("Igor", 1),
+ row("Igor", 1),
+ row("Igor", 2),
+ row("Roman", 2)
);
}
else {
- minusNode = new MinusNode<>(
- ctx,
- rowType,
- MAP,
- all,
- rowFactory()
- );
- }
-
- minusNode.register(Arrays.asList(scan1, scan2));
-
- if (!single) {
- MinusNode<Object[]> reduceNode = new MinusNode<>(
- ctx,
- rowType,
- REDUCE,
- all,
- rowFactory()
+ expectedResult = Arrays.asList(
+ row("Igor", 2),
+ row("Roman", 2)
);
-
- reduceNode.register(Collections.singletonList(minusNode));
-
- minusNode = reduceNode;
}
- Comparator<Object[]> cmp = ctx.expressionFactory().comparator(RelCollations.of(ImmutableIntList.of(0, 1)));
-
- // Create sort node on the top to check sorted results.
- SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, cmp);
- sortNode.register(minusNode);
-
- RootNode<Object[]> root = new RootNode<>(ctx, rowType);
- root.register(sortNode);
-
- assertTrue(root.hasNext());
-
- if (all) {
- Assert.assertArrayEquals(row("Igor", 1), root.next());
- Assert.assertArrayEquals(row("Igor", 1), root.next());
- }
-
- Assert.assertArrayEquals(row("Igor", 2), root.next());
- Assert.assertArrayEquals(row("Roman", 2), root.next());
-
- assertFalse(root.hasNext());
- }
-
- /** */
- protected RowHandler.RowFactory<Object[]> rowFactory() {
- return new RowHandler.RowFactory<Object[]>() {
- /** */
- @Override public RowHandler<Object[]> handler() {
- return ArrayRowHandler.INSTANCE;
- }
-
- /** */
- @Override public Object[] create() {
- throw new AssertionError();
- }
-
- /** */
- @Override public Object[] create(Object... fields) {
- return fields;
- }
- };
+ checkSetOp(single, all, Arrays.asList(ds1, ds2, ds3), expectedResult);
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 94fe7a7..97a536e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -38,7 +38,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@WithSystemProperty(key = "calcite.debug", value = "false")
public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
/** */
- private static IgniteEx client;
+ protected static IgniteEx client;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
new file mode 100644
index 0000000..07a7f55
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Integration test for set op (EXCEPT, INTERSECT).
+ */
+public class SetOpIntegrationTest extends AbstractBasicIntegrationTest {
+ /** */
+ private void populateTables() throws InterruptedException {
+ IgniteCache<Integer, Employer> emp1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp1")
+ .setSqlSchema("PUBLIC")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp1")))
+ .setBackups(2)
+ );
+
+ IgniteCache<Integer, Employer> emp2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>("emp2")
+ .setSqlSchema("PUBLIC")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp2")))
+ .setBackups(1)
+ );
+
+ emp1.put(1, new Employer("Igor", 10d));
+ emp1.put(2, new Employer("Igor", 11d));
+ emp1.put(3, new Employer("Igor", 12d));
+ emp1.put(4, new Employer("Igor1", 13d));
+ emp1.put(5, new Employer("Igor1", 13d));
+ emp1.put(6, new Employer("Igor1", 13d));
+ emp1.put(7, new Employer("Roman", 14d));
+
+ emp2.put(1, new Employer("Roman", 10d));
+ emp2.put(2, new Employer("Roman", 11d));
+ emp2.put(3, new Employer("Roman", 12d));
+ emp2.put(4, new Employer("Roman", 13d));
+ emp2.put(5, new Employer("Igor1", 13d));
+ emp2.put(6, new Employer("Igor1", 13d));
+
+ awaitPartitionMapExchange(true, true, null);
+ }
+
+ /** Copy cache with it's content to new replicated cache. */
+ private void copyCacheAsReplicated(String cacheName) throws InterruptedException {
+ IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<Object, Object>(
+ cache.getConfiguration(CacheConfiguration.class));
+
+ ccfg.setName(cacheName + "Replicated");
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+ ccfg.getQueryEntities().forEach(qe -> qe.setTableName(qe.getTableName() + "_repl"));
+
+ IgniteCache<Object, Object> replCache = client.getOrCreateCache(ccfg);
+
+ for (Cache.Entry<?, ?> entry : cache)
+ replCache.put(entry.getKey(), entry.getValue());
+
+ awaitPartitionMapExchange(true, true, null);
+ }
+
+ /** */
+ @Test
+ public void testExcept() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name FROM emp1 EXCEPT SELECT name FROM emp2");
+
+ assertEquals(1, rows.size());
+ assertEquals("Igor", rows.get(0).get(0));
+ }
+
+ /** */
+ @Test
+ public void testExceptFromEmpty() throws Exception {
+ populateTables();
+
+ copyCacheAsReplicated("emp1");
+ copyCacheAsReplicated("emp2");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1 WHERE salary < 0 EXCEPT SELECT name FROM emp2");
+
+ assertEquals(0, rows.size());
+
+ rows = sql("SELECT name FROM emp1_repl WHERE salary < 0 EXCEPT SELECT name FROM emp2_repl");
+
+ assertEquals(0, rows.size());
+ }
+
+ /** */
+ @Test
+ public void testExceptSeveralColumns() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name, salary FROM emp1 EXCEPT SELECT name, salary FROM emp2");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ }
+
+ /** */
+ @Test
+ public void testExceptAll() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name FROM emp1 EXCEPT ALL SELECT name FROM emp2");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptNested() throws Exception {
+ populateTables();
+
+ List<List<?>> rows =
+ sql("SELECT name FROM emp1 EXCEPT (SELECT name FROM emp1 EXCEPT SELECT name FROM emp2)");
+
+ assertEquals(2, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptReplicatedWithPartitioned() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("emp1");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1_repl EXCEPT ALL SELECT name FROM emp2");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptReplicated() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("emp1");
+ copyCacheAsReplicated("emp2");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1_repl EXCEPT ALL SELECT name FROM emp2_repl");
+
+ assertEquals(4, rows.size());
+ assertEquals(3, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testExceptMerge() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("emp1");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1_repl EXCEPT ALL SELECT name FROM emp2 EXCEPT ALL " +
+ "SELECT name FROM emp1 WHERE salary < 11");
+
+ assertEquals(3, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testSetOpBigBatch() throws Exception {
+ client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+ .setName("cache1")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table1")))
+ .setBackups(2)
+ );
+
+ client.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
+ .setName("cache2")
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, Integer.class).setTableName("table2")))
+ .setBackups(1)
+ );
+
+ copyCacheAsReplicated("cache1");
+ copyCacheAsReplicated("cache2");
+
+ try (IgniteDataStreamer<Integer, Integer> ds1 = client.dataStreamer("cache1");
+ IgniteDataStreamer<Integer, Integer> ds2 = client.dataStreamer("cache2");
+ IgniteDataStreamer<Integer, Integer> ds3 = client.dataStreamer("cache1Replicated");
+ IgniteDataStreamer<Integer, Integer> ds4 = client.dataStreamer("cache2Replicated")
+ ) {
+ int key = 0;
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < ((i == 0) ? 1 : (1 << (i * 4 - 1))); j++) {
+ // Cache1 keys count: 1 of "0", 8 of "1", 128 of "2", 2048 of "3", 32768 of "4".
+ ds1.addData(key++, i);
+ ds3.addData(key++, i);
+
+ // Cache2 keys count: 1 of "5", 128 of "3", 32768 of "1".
+ if ((i & 1) == 0) {
+ ds2.addData(key++, 5 - i);
+ ds4.addData(key++, 5 - i);
+ }
+ }
+ }
+ }
+
+ awaitPartitionMapExchange(true, true, null);
+
+ List<List<?>> rows;
+
+ // Check 2 partitioned caches.
+ rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(3, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1\".table1 EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(34817, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+ assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1\".table1 INTERSECT SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(2, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+ rows = sql("SELECT _val FROM \"cache1\".table1 INTERSECT ALL SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(136, rows.size());
+ assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+
+ // Check 1 replicated and 1 partitioned caches.
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(3, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(34817, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+ assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(2, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT ALL SELECT _val FROM \"cache2\".table2");
+
+ assertEquals(136, rows.size());
+ assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+
+ // Check 2 replicated caches.
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT SELECT _val FROM \"cache2Replicated\"" +
+ ".table2_repl");
+
+ assertEquals(3, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl EXCEPT ALL SELECT _val FROM \"cache2Replicated\"" +
+ ".table2_repl");
+
+ assertEquals(34817, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(0)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
+ assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
+ assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT SELECT _val FROM \"cache2Replicated\"" +
+ ".table2_repl");
+
+ assertEquals(2, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+ rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT ALL SELECT _val FROM \"cache2Replicated\"" +
+ ".table2_repl");
+
+ assertEquals(136, rows.size());
+ assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+ assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+ }
+
+ /** */
+ @Test
+ public void testIntersect() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name FROM emp1 INTERSECT SELECT name FROM emp2");
+
+ assertEquals(2, rows.size());
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Igor1")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ }
+
+ /** */
+ @Test
+ public void testInstersectAll() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name FROM emp1 INTERSECT ALL SELECT name FROM emp2");
+
+ assertEquals(3, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ }
+
+ /** */
+ @Test
+ public void testIntersectEmpty() throws Exception {
+ populateTables();
+
+ copyCacheAsReplicated("emp1");
+ copyCacheAsReplicated("emp2");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1 WHERE salary < 0 INTERSECT SELECT name FROM emp2");
+
+ assertEquals(0, rows.size());
+
+ rows = sql("SELECT name FROM emp1_repl WHERE salary < 0 INTERSECT SELECT name FROM emp2_repl");
+
+ assertEquals(0, rows.size());
+ }
+
+ /** */
+ @Test
+ public void testIntersectMerge() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("emp1");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1_repl INTERSECT ALL SELECT name FROM emp2 INTERSECT ALL " +
+ "SELECT name FROM emp1 WHERE salary < 14");
+
+ assertEquals(2, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ @Test
+ public void testIntersectReplicated() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("emp1");
+ copyCacheAsReplicated("emp2");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1_repl INTERSECT ALL SELECT name FROM emp2_repl");
+
+ assertEquals(3, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ }
+
+ /** */
+ @Test
+ public void testIntersectReplicatedWithPartitioned() throws Exception {
+ populateTables();
+ copyCacheAsReplicated("emp1");
+
+ List<List<?>> rows = sql("SELECT name FROM emp1_repl INTERSECT ALL SELECT name FROM emp2");
+
+ assertEquals(3, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+ assertEquals(1, F.size(rows, r -> r.get(0).equals("Roman")));
+ }
+
+ /** */
+ @Test
+ public void testIntersectSeveralColumns() throws Exception {
+ populateTables();
+
+ List<List<?>> rows = sql("SELECT name, salary FROM emp1 INTERSECT ALL SELECT name, salary FROM emp2");
+
+ assertEquals(2, rows.size());
+ assertEquals(2, F.size(rows, r -> r.get(0).equals("Igor1")));
+ }
+
+ /** */
+ private List<List<?>> sql(String sql) throws IgniteInterruptedCheckedException {
+ QueryEngine engine = Commons.lookupComponent(client.context(), QueryEngine.class);
+
+ return engine.query(null, "PUBLIC", sql).get(0).getAll();
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 91256552..4779a20 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -475,6 +475,8 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
String... disabledRules) throws Exception {
IgniteRel plan = physicalPlan(sql, schema, disabledRules);
+ checkSplitAndSerialization(plan, schema);
+
if (!predicate.test((T)plan)) {
String invalidPlanMsg = "Invalid plan (" + lastErrorMsg + "):\n" +
RelOptUtil.toString(plan, SqlExplainLevel.ALL_ATTRIBUTES);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SetOpPlannerTest.java
similarity index 52%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SetOpPlannerTest.java
index 1b04e91..e307c5f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/ExceptPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SetOpPlannerTest.java
@@ -20,28 +20,47 @@ package org.apache.ignite.internal.processors.query.calcite.planner;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapIntersect;
import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceIntersect;
import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleIntersect;
import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleSetOp;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
- * Test to verify EXCEPT operator.
+ * Test to verify set op (EXCEPT, INTERSECT).
*/
-public class ExceptPlannerTest extends AbstractPlannerTest {
+@RunWith(Parameterized.class)
+public class SetOpPlannerTest extends AbstractPlannerTest {
+ /** Algorithm. */
+ @Parameterized.Parameter
+ public SetOp setOp;
+
+ /** */
+ @Parameterized.Parameters(name = "SetOp = {0}")
+ public static List<Object[]> parameters() {
+ return Stream.of(SetOp.values()).map(a -> new Object[]{a}).collect(Collectors.toList());
+ }
+
/** Public schema. */
private IgniteSchema publicSchema;
- /** Last error. */
- private String lastError;
-
/** {@inheritDoc} */
@Before
@Override public void setup() {
@@ -83,14 +102,14 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptRandom() throws Exception {
- String sql = "" +
+ public void testSetOpRandom() throws Exception {
+ String sql =
"SELECT * FROM random_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM random_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(n -> !n.all())
+ .and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("random_tbl1")))
.and(input(1, isTableScan("random_tbl2")))
))
@@ -101,14 +120,14 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptAllRandom() throws Exception {
- String sql = "" +
+ public void testSetOpAllRandom() throws Exception {
+ String sql =
"SELECT * FROM random_tbl1 " +
- "EXCEPT ALL " +
+ setOpAll() +
"SELECT * FROM random_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(IgniteSetOp::all)
+ .and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("random_tbl1")))
.and(input(1, isTableScan("random_tbl2")))
))
@@ -119,13 +138,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptBroadcast() throws Exception {
- String sql = "" +
+ public void testSetOpBroadcast() throws Exception {
+ String sql =
"SELECT * FROM broadcast_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM broadcast_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
.and(input(0, isTableScan("broadcast_tbl1")))
.and(input(1, isTableScan("broadcast_tbl2")))
);
@@ -135,13 +154,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptSingle() throws Exception {
- String sql = "" +
+ public void testSetOpSingle() throws Exception {
+ String sql =
"SELECT * FROM single_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM single_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, isTableScan("single_tbl2"))));
}
@@ -150,13 +169,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptSingleAndRandom() throws Exception {
- String sql = "" +
+ public void testSetOpSingleAndRandom() throws Exception {
+ String sql =
"SELECT * FROM single_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM random_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, hasChildThat(isTableScan("random_tbl1")))));
}
@@ -165,13 +184,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptSingleAndAffinity() throws Exception {
- String sql = "" +
+ public void testSetOpSingleAndAffinity() throws Exception {
+ String sql =
"SELECT * FROM single_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM affinity_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, hasChildThat(isTableScan("affinity_tbl1")))));
}
@@ -180,13 +199,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptSingleAndBroadcast() throws Exception {
- String sql = "" +
+ public void testSetOpSingleAndBroadcast() throws Exception {
+ String sql =
"SELECT * FROM single_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM broadcast_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
.and(input(0, isTableScan("single_tbl1")))
.and(input(1, isTableScan("broadcast_tbl1")))
);
@@ -196,14 +215,14 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptAffinity() throws Exception {
- String sql = "" +
+ public void testSetOpAffinity() throws Exception {
+ String sql =
"SELECT * FROM affinity_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM affinity_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("affinity_tbl1")))
.and(input(1, isTableScan("affinity_tbl2")))
))
@@ -214,13 +233,13 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptBroadcastAndRandom() throws Exception {
- String sql = "" +
+ public void testSetOpBroadcastAndRandom() throws Exception {
+ String sql =
"SELECT * FROM random_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM broadcast_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
.and(input(0, hasChildThat(isTableScan("random_tbl1"))))
.and(input(1, isTableScan("broadcast_tbl1")))
);
@@ -230,64 +249,88 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptRandomNested() throws Exception {
- String sql = "" +
- "SELECT * FROM random_tbl2 EXCEPT (" +
+ public void testSetOpRandomNested() throws Exception {
+ String sql =
+ "SELECT * FROM random_tbl2 " + setOp() + "(" +
" SELECT * FROM random_tbl1 " +
- " EXCEPT " +
+ setOp() +
" SELECT * FROM random_tbl2" +
")";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
- .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
- .and(input(1, isInstanceOf(IgniteReduceMinus.class)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
- .and(input(0, isTableScan("random_tbl1")))
- .and(input(1, isTableScan("random_tbl2")))
+ if (setOp == SetOp.EXCEPT) {
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ .and(input(0, hasChildThat(isTableScan("random_tbl2"))))
+ .and(input(1, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
))
- ))
- );
+ );
+ }
+ else {
+ // INTERSECT operator is commutative and can be merged.
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, isTableScan("random_tbl2")))
+ .and(input(1, isTableScan("random_tbl1")))
+ .and(input(2, isTableScan("random_tbl2")))
+ ))
+ );
+ }
}
/**
* @throws Exception If failed.
*/
@Test
- public void testExceptBroadcastAndRandomNested() throws Exception {
- String sql = "" +
- "SELECT * FROM broadcast_tbl1 EXCEPT (" +
+ public void testSetOpBroadcastAndRandomNested() throws Exception {
+ String sql =
+ "SELECT * FROM broadcast_tbl1 " + setOp() + "(" +
" SELECT * FROM random_tbl1 " +
- " EXCEPT " +
+ setOp() +
" SELECT * FROM random_tbl2" +
")";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteSingleMinus.class)
- .and(input(0, isTableScan("broadcast_tbl1")))
- .and(input(1, isInstanceOf(IgniteReduceMinus.class)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
- .and(input(0, isTableScan("random_tbl1")))
- .and(input(1, isTableScan("random_tbl2")))
+ if (setOp == SetOp.EXCEPT) {
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.single)
+ .and(input(0, isTableScan("broadcast_tbl1")))
+ .and(input(1, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, isTableScan("random_tbl1")))
+ .and(input(1, isTableScan("random_tbl2")))
+ ))
))
- ))
- );
+ );
+ }
+ else {
+ // INTERSECT operator is commutative and can be merged.
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
+ .and(input(0, nodeOrAnyChild(isTableScan("broadcast_tbl1"))))
+ .and(input(1, isTableScan("random_tbl1")))
+ .and(input(2, isTableScan("random_tbl2")))
+ ))
+ );
+ }
}
/**
* @throws Exception If failed.
*/
@Test
- public void testExceptMerge() throws Exception {
- String sql = "" +
+ public void testSetOpMerge() throws Exception {
+ String sql =
"SELECT * FROM random_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM random_tbl2 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM affinity_tbl1 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM affinity_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce)
+ .and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("random_tbl1")))
.and(input(1, isTableScan("random_tbl2")))
.and(input(2, isTableScan("affinity_tbl1")))
@@ -300,18 +343,18 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptAllMerge() throws Exception {
- String sql = "" +
+ public void testSetOpAllMerge() throws Exception {
+ String sql =
"SELECT * FROM random_tbl1 " +
- "EXCEPT ALL " +
+ setOpAll() +
"SELECT * FROM random_tbl2 " +
- "EXCEPT ALL " +
+ setOpAll() +
"SELECT * FROM affinity_tbl1 " +
- "EXCEPT ALL " +
+ setOpAll() +
"SELECT * FROM affinity_tbl2 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> n.all)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(IgniteSetOp::all)
+ .and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("random_tbl1")))
.and(input(1, isTableScan("random_tbl2")))
.and(input(2, isTableScan("affinity_tbl1")))
@@ -324,20 +367,66 @@ public class ExceptPlannerTest extends AbstractPlannerTest {
* @throws Exception If failed.
*/
@Test
- public void testExceptAllWithExceptMerge() throws Exception {
- String sql = "" +
+ public void testSetOpAllWithExceptMerge() throws Exception {
+ String sql =
"SELECT * FROM random_tbl1 " +
- "EXCEPT ALL " +
+ setOpAll() +
"SELECT * FROM random_tbl2 " +
- "EXCEPT " +
+ setOp() +
"SELECT * FROM affinity_tbl1 ";
- assertPlan(sql, publicSchema, isInstanceOf(IgniteReduceMinus.class).and(n -> !n.all)
- .and(hasChildThat(isInstanceOf(IgniteMapMinus.class)
+ assertPlan(sql, publicSchema, isInstanceOf(setOp.reduce).and(n -> !n.all())
+ .and(hasChildThat(isInstanceOf(setOp.map)
.and(input(0, isTableScan("random_tbl1")))
.and(input(1, isTableScan("random_tbl2")))
.and(input(2, isTableScan("affinity_tbl1")))
))
);
}
+
+ /** */
+ private String setOp() {
+ return setOp.name() + ' ';
+ }
+
+ /** */
+ private String setOpAll() {
+ return setOp.name() + " ALL ";
+ }
+
+ /** */
+ enum SetOp {
+ /** */
+ EXCEPT(
+ IgniteSingleMinus.class,
+ IgniteMapMinus.class,
+ IgniteReduceMinus.class
+ ),
+
+ /** */
+ INTERSECT(
+ IgniteSingleIntersect.class,
+ IgniteMapIntersect.class,
+ IgniteReduceIntersect.class
+ );
+
+ /** */
+ public final Class<? extends IgniteSingleSetOp> single;
+
+ /** */
+ public final Class<? extends IgniteMapSetOp> map;
+
+ /** */
+ public final Class<? extends IgniteReduceSetOp> reduce;
+
+ /** */
+ SetOp(
+ Class<? extends IgniteSingleSetOp> single,
+ Class<? extends IgniteMapSetOp> map,
+ Class<? extends IgniteReduceSetOp> reduce) {
+ this.single = single;
+ this.map = map;
+ this.reduce = reduce;
+ }
+ }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionPlannerTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionPlannerTest.java
index fdd026d..227a95b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableFunctionPlannerTest.java
@@ -33,7 +33,7 @@ import org.junit.Test;
/**
* Test table functions.
*/
-public class TableFunctionTest extends AbstractPlannerTest {
+public class TableFunctionPlannerTest extends AbstractPlannerTest {
/** Public schema. */
private IgniteSchema publicSchema;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 877228e..3b4ad0b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTes
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashIndexSpoolExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
@@ -48,6 +49,7 @@ import org.junit.runners.Suite;
HashAggregateSingleGroupExecutionTest.class,
SortAggregateExecutionTest.class,
MinusExecutionTest.class,
+ IntersectExecutionTest.class,
RuntimeTreeIndexTest.class,
})
public class ExecutionTestSuite {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index fd9e0a1..a22299c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.Aggregate
import org.apache.ignite.internal.processors.query.calcite.integration.CalciteErrorHandlilngIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.SetOpIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.SortAggregateIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
@@ -61,6 +62,7 @@ import org.junit.runners.Suite;
TableDmlIntegrationTest.class,
DataTypesTest.class,
IndexSpoolIntegrationTest.class,
+ SetOpIntegrationTest.class,
})
public class IntegrationTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 47136f3..14f3263 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -20,15 +20,15 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.SetOpPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.TableDmlPlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.planner.TableFunctionTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.TableFunctionPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -48,8 +48,8 @@ import org.junit.runners.Suite;
HashAggregatePlannerTest.class,
SortAggregatePlannerTest.class,
JoinColocationPlannerTest.class,
- ExceptPlannerTest.class,
- TableFunctionTest.class,
+ SetOpPlannerTest.class,
+ TableFunctionPlannerTest.class,
TableDmlPlannerTest.class,
})
public class PlannerTestSuite {