You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/15 09:59:03 UTC
[ignite] branch sql-calcite updated: IGNITE-13545 Calcite
integration. Index spool (#8596)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new f55f922 IGNITE-13545 Calcite integration. Index spool (#8596)
f55f922 is described below
commit f55f922c5c21667ae9e5cc3fb1c9491d17c38c3c
Author: tledkov <tl...@gridgain.com>
AuthorDate: Fri Jan 15 12:58:47 2021 +0300
IGNITE-13545 Calcite integration. Index spool (#8596)
---
.../query/calcite/exec/AbstractIndexScan.java | 166 ++++++
.../processors/query/calcite/exec/IndexScan.java | 145 ++---
.../query/calcite/exec/LogicalRelImplementor.java | 39 +-
.../query/calcite/exec/RuntimeTreeIndex.java | 203 +++++++
.../{TableSpoolNode.java => IndexSpoolNode.java} | 152 +++--
.../query/calcite/exec/rel/TableSpoolNode.java | 2 +-
.../query/calcite/metadata/IgniteMdRowCount.java | 10 +
.../calcite/metadata/IgniteMdSelectivity.java | 15 +
.../processors/query/calcite/prepare/Cloner.java | 16 +-
.../processors/query/calcite/prepare/Fragment.java | 6 +-
.../query/calcite/prepare/FragmentSplitter.java | 1 +
.../query/calcite/prepare/IgniteRelShuttle.java | 6 +
.../query/calcite/prepare/PlannerPhase.java | 4 +-
.../processors/query/calcite/prepare/Splitter.java | 6 +-
.../query/calcite/rel/AbstractIndexScan.java | 73 +--
.../query/calcite/rel/IgniteAggregate.java | 4 +-
.../rel/IgniteCorrelatedNestedLoopJoin.java | 59 +-
.../query/calcite/rel/IgniteIndexScan.java | 25 +-
.../query/calcite/rel/IgniteIndexSpool.java | 150 +++++
.../processors/query/calcite/rel/IgniteLimit.java | 13 +
.../query/calcite/rel/IgniteMergeJoin.java | 70 +--
.../query/calcite/rel/IgniteRelVisitor.java | 5 +
.../query/calcite/rel/IgniteTrimExchange.java | 5 +-
.../rel/ProjectableFilterableTableScan.java | 2 +-
.../rel/logical/IgniteLogicalIndexScan.java | 44 +-
.../rel/logical/IgniteLogicalTableScan.java | 5 +-
.../calcite/rule/CorrelatedNestedLoopJoinRule.java | 18 +-
.../query/calcite/rule/FilterSpoolMergeRule.java | 116 ++++
.../calcite/rule/LogicalScanConverterRule.java | 29 +-
.../calcite/rule/logical/ProjectScanMergeRule.java | 57 +-
.../query/calcite/schema/IgniteIndex.java | 4 +-
.../query/calcite/schema/TableDescriptorImpl.java | 8 +-
.../processors/query/calcite/trait/TraitUtils.java | 14 +
.../processors/query/calcite/util/Commons.java | 51 +-
.../query/calcite/util/IndexConditions.java | 141 +++++
.../processors/query/calcite/util/RexUtils.java | 56 +-
...oolTest.java => IndexSpoolIntegrationTest.java} | 15 +-
.../query/calcite/exec/RuntimeTreeIndexTest.java | 196 ++++++
.../calcite/exec/rel/AbstractExecutionTest.java | 100 ++++
.../query/calcite/exec/rel/ExecutionTest.java | 144 -----
.../calcite/exec/rel/IndexSpoolExecutionTest.java | 191 ++++++
.../calcite/exec/rel/TableSpoolExecutionTest.java | 104 ++++
.../query/calcite/planner/AbstractPlannerTest.java | 654 +++++++++++++++++++++
.../CorrelatedNestedLoopJoinPlannerTest.java | 172 ++++++
.../calcite/planner/IndexSpoolPlannerTest.java | 267 +++++++++
.../query/calcite/{ => planner}/PlannerTest.java | 549 +----------------
.../calcite/planner/TableSpoolPlannerTest.java | 138 +++++
...lciteTestSuite.java => ExecutionTestSuite.java} | 32 +-
.../ignite/testsuites/IgniteCalciteTestSuite.java | 12 +-
.../apache/ignite/testsuites/PlannerTestSuite.java | 38 ++
50 files changed, 3227 insertions(+), 1105 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
new file mode 100644
index 0000000..096e4ff
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.GridIndex;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Abstract index scan.
+ */
+public abstract class AbstractIndexScan<Row, IdxRow> implements Iterable<Row>, AutoCloseable {
+ /** */
+ private final GridIndex<IdxRow> idx;
+
+ /** Additional filters. */
+ private final Predicate<Row> filters;
+
+ /** Lower index scan bound. */
+ private final Supplier<Row> lowerBound;
+
+ /** Upper index scan bound. */
+ private final Supplier<Row> upperBound;
+
+ /** */
+ private final Function<Row, Row> rowTransformer;
+
+ /** */
+ protected final ExecutionContext<Row> ectx;
+
+ /** */
+ protected final RelDataType rowType;
+
+ /**
+ * @param ectx Execution context.
+ * @param idx Physical index.
+ * @param filters Additional filters.
+ * @param lowerBound Lower index scan bound.
+ * @param upperBound Upper index scan bound.
+ */
+ protected AbstractIndexScan(
+ ExecutionContext<Row> ectx,
+ RelDataType rowType,
+ GridIndex<IdxRow> idx,
+ Predicate<Row> filters,
+ Supplier<Row> lowerBound,
+ Supplier<Row> upperBound,
+ Function<Row, Row> rowTransformer
+ ) {
+ this.ectx = ectx;
+ this.rowType = rowType;
+ this.idx = idx;
+ this.filters = filters;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ this.rowTransformer = rowTransformer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Iterator<Row> iterator() {
+ IdxRow lower = lowerBound == null ? null : row2indexRow(lowerBound.get());
+ IdxRow upper = upperBound == null ? null : row2indexRow(upperBound.get());
+
+ return new IteratorImpl(idx.find(lower, upper, filterClosure()));
+ }
+
+ /** */
+ protected abstract IdxRow row2indexRow(Row bound);
+
+ /** */
+ protected abstract Row indexRow2Row(IdxRow idxRow) throws IgniteCheckedException;
+
+ /** */
+ protected abstract BPlusTree.TreeRowClosure<IdxRow, IdxRow> filterClosure();
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+
+ /** */
+ private class IteratorImpl extends GridIteratorAdapter<Row> {
+ /** */
+ private final GridCursor<IdxRow> cursor;
+
+ /** Next element. */
+ private Row next;
+
+ /** */
+ private IteratorImpl(@NotNull GridCursor<IdxRow> cursor) {
+ this.cursor = cursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ advance();
+
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row nextX() throws IgniteCheckedException {
+ advance();
+
+ if (next == null)
+ throw new NoSuchElementException();
+
+ Row res = next;
+
+ next = null;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeX() {
+ throw new UnsupportedOperationException("Remove is not supported.");
+ }
+
+ /** */
+ private void advance() throws IgniteCheckedException {
+ assert cursor != null;
+
+ if (next != null)
+ return;
+
+ while (next == null && cursor.next()) {
+ IdxRow idxRow = cursor.get();
+
+ Row r = indexRow2Row(idxRow);
+
+ if (filters != null && !filters.test(r))
+ continue;
+
+ if (rowTransformer != null)
+ r = rowTransformer.apply(r);
+
+ next = r;
+ }
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index fbacfde..b9d3c9d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -20,11 +20,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
-import org.apache.calcite.rel.type.RelDataType;
+
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -47,20 +46,17 @@ import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeFilterClosure;
import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRow;
import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.h2.value.DataType;
import org.h2.value.Value;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Scan on index.
*/
-public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
+public class IndexScan<Row> extends AbstractIndexScan<Row, H2Row> {
/** */
private final GridKernalContext kctx;
@@ -68,9 +64,6 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
private final GridCacheContext<?, ?> cctx;
/** */
- private final ExecutionContext<Row> ectx;
-
- /** */
private final CacheObjectContext coCtx;
/** */
@@ -80,20 +73,8 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
private final RowFactory<Row> factory;
/** */
- private final GridIndex<H2Row> idx;
-
- /** */
private final AffinityTopologyVersion topVer;
- /** Additional filters. */
- private final Predicate<Row> filters;
-
- /** Lower index scan bound. */
- private final Supplier<Row> lowerBound;
-
- /** Upper index scan bound. */
- private final Supplier<Row> upperBound;
-
/** */
private final int[] parts;
@@ -104,10 +85,7 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
private volatile List<GridDhtLocalPartition> reserved;
/** */
- private final Function<Row, Row> rowTransformer;
-
- /** */
- private final ImmutableBitSet requiredColunms;
+ private final ImmutableBitSet requiredColumns;
/**
* @param ectx Execution context.
@@ -126,36 +104,36 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
Supplier<Row> lowerBound,
Supplier<Row> upperBound,
Function<Row, Row> rowTransformer,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) {
- this.ectx = ectx;
+ super(
+ ectx,
+ desc.rowType(ectx.getTypeFactory(), requiredColumns),
+ idx,
+ filters,
+ lowerBound,
+ upperBound,
+ rowTransformer
+ );
+
this.desc = desc;
cctx = desc.cacheContext();
kctx = cctx.kernalContext();
coCtx = cctx.cacheObjectContext();
- RelDataType rowType = desc.rowType(this.ectx.getTypeFactory(), requiredColunms);
-
- factory = this.ectx.rowHandler().factory(this.ectx.getTypeFactory(), rowType);
- this.idx = idx;
+ factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
topVer = ectx.planningContext().topologyVersion();
- this.filters = filters;
- this.lowerBound = lowerBound;
- this.upperBound = upperBound;
this.parts = parts;
mvccSnapshot = ectx.mvccSnapshot();
- this.rowTransformer = rowTransformer;
- this.requiredColunms = requiredColunms;
+ this.requiredColumns = requiredColumns;
}
/** {@inheritDoc} */
@Override public synchronized Iterator<Row> iterator() {
reserve();
- try {
- H2Row lower = lowerBound == null ? null : new H2PlainRow(values(coCtx, ectx, lowerBound.get()));
- H2Row upper = upperBound == null ? null : new H2PlainRow(values(coCtx, ectx, upperBound.get()));
- return new IteratorImpl(idx.find(lower, upper, filterClosure()));
+ try {
+ return super.iterator();
}
catch (Exception e) {
release();
@@ -164,6 +142,16 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
}
}
+ /** {@inheritDoc} */
+ @Override protected H2Row row2indexRow(Row bound) {
+ return new H2PlainRow(values(coCtx, ectx, bound));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Row indexRow2Row(H2Row row) throws IgniteCheckedException {
+ return desc.toRow(ectx, (CacheDataRow)row, factory, requiredColumns);
+ }
+
/** */
@Override public void close() {
release();
@@ -213,12 +201,17 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
try {
for (GridDhtLocalPartition part : toReserve) {
- if (part == null || !part.reserve())
- throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
+ if (part == null || !part.reserve()) {
+ throw new ClusterTopologyException(
+ "Failed to reserve partition for query execution. Retry on stable topology."
+ );
+ }
else if (part.state() != GridDhtPartitionState.OWNING) {
part.release();
- throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
+ throw new ClusterTopologyException(
+ "Failed to reserve partition for query execution. Retry on stable topology."
+ );
}
reserved.add(part);
@@ -245,8 +238,8 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
reserved = null;
}
- /** */
- private H2TreeFilterClosure filterClosure() {
+ /** {@inheritDoc} */
+ @Override protected H2TreeFilterClosure filterClosure() {
IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer, parts);
IndexingQueryCacheFilter f = filter.forCache(cctx.name());
H2TreeFilterClosure filterC = null;
@@ -277,66 +270,4 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
throw new IgniteException("Failed to wrap object into H2 Value.", e);
}
}
-
- /** */
- private class IteratorImpl extends GridIteratorAdapter<Row> {
- /** */
- private final GridCursor<H2Row> cursor;
-
- /** Next element. */
- private Row next;
-
- /** */
- public IteratorImpl(@NotNull GridCursor<H2Row> cursor) {
- this.cursor = cursor;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNextX() throws IgniteCheckedException {
- advance();
-
- return next != null;
- }
-
- /** {@inheritDoc} */
- @Override public Row nextX() throws IgniteCheckedException {
- advance();
-
- if (next == null)
- throw new NoSuchElementException();
-
- Row res = next;
-
- next = null;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void removeX() {
- throw new UnsupportedOperationException("Remove is not supported.");
- }
-
- /** */
- private void advance() throws IgniteCheckedException {
- assert cursor != null;
-
- if (next != null)
- return;
-
- while (next == null && cursor.next()) {
- H2Row h2Row = cursor.get();
-
- Row r = desc.toRow(ectx, (CacheDataRow)h2Row, factory, requiredColunms);
-
- if (filters != null && !filters.test(r))
- continue;
-
- if (rowTransformer != null)
- r = rowTransformer.apply(r);
-
- next = r;
- }
- }
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index e39349f..671e481 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNod
import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ModifyNode;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
@@ -165,7 +167,6 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteTrimExchange rel) {
assert TraitUtils.distribution(rel).getType() == HASH_DISTRIBUTED;
- assert TraitUtils.distribution(rel.getInput()).getType() == BROADCAST_DISTRIBUTED;
IgniteDistribution distr = rel.distribution();
Destination<Row> dest = distr.destination(ctx, affSrvc, ctx.group(rel.sourceId()));
@@ -264,11 +265,11 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
- ImmutableBitSet requiredColunms = rel.requiredColumns();
+ ImmutableBitSet requiredColumns = rel.requiredColumns();
List<RexNode> lowerCond = rel.lowerBound();
List<RexNode> upperCond = rel.upperBound();
- RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
+ RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
@@ -279,7 +280,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
CollocationGroup group = ctx.group(rel.sourceId());
- Iterable<Row> rowsIter = idx.scan(ctx, group, filters, lower, upper, prj, requiredColunms);
+ Iterable<Row> rowsIter = idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
return new ScanNode<>(ctx, rowType, rowsIter);
}
@@ -364,6 +365,36 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
}
/** {@inheritDoc} */
+ @Override public Node<Row> visit(IgniteIndexSpool rel) {
+ RelCollation collation = rel.collation();
+
+ assert rel.indexCondition() != null : rel;
+
+ List<RexNode> lowerBound = rel.indexCondition().lowerBound();
+ List<RexNode> upperBound = rel.indexCondition().upperBound();
+
+ Predicate<Row> filter = expressionFactory.predicate(rel.condition(), rel.getRowType());
+ Supplier<Row> lower = lowerBound == null ? null : expressionFactory.rowSource(lowerBound);
+ Supplier<Row> upper = upperBound == null ? null : expressionFactory.rowSource(upperBound);
+
+ IndexSpoolNode<Row> node = new IndexSpoolNode<>(
+ ctx,
+ rel.getRowType(),
+ collation,
+ expressionFactory.comparator(collation),
+ filter,
+ lower,
+ upper
+ );
+
+ Node<Row> input = visit(rel.getInput());
+
+ node.register(input);
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
@Override public Node<Row> visit(IgniteTableModify rel) {
switch (rel.getOperation()) {
case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndex.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndex.java
new file mode 100644
index 0000000..cfd5669
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndex.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.GridIndex;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Runtime sorted index based on on-heap tree.
+ */
+public class RuntimeTreeIndex<Row> implements GridIndex<Row>, AutoCloseable {
+ /** */
+ protected final ExecutionContext<Row> ectx;
+
+ /** */
+ protected final Comparator<Row> comp;
+
+ /** Collation. */
+ private final RelCollation collation;
+
+ /** Rows. */
+ private TreeMap<Row, List<Row>> rows;
+
+ /**
+ *
+ */
+ public RuntimeTreeIndex(
+ ExecutionContext<Row> ectx,
+ RelCollation collation,
+ Comparator<Row> comp
+ ) {
+ this.ectx = ectx;
+ this.comp = comp;
+
+ assert Objects.nonNull(collation);
+
+ this.collation = collation;
+ rows = new TreeMap<>(comp);
+ }
+
+ /**
+ * Add row to index.
+ */
+ public void push(Row r) {
+ List<Row> newEqRows = new ArrayList<>();
+
+ List<Row> eqRows = rows.putIfAbsent(r, newEqRows);
+
+ if (eqRows != null)
+ eqRows.add(r);
+ else
+ newEqRows.add(r);
+ }
+
+ /** */
+ @Override public void close() {
+ rows.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<Row> find(Row lower, Row upper, BPlusTree.TreeRowClosure<Row, Row> filterC) {
+ assert filterC == null;
+
+ int firstCol = F.first(collation.getKeys());
+
+ if (ectx.rowHandler().get(firstCol, lower) != null && ectx.rowHandler().get(firstCol, upper) != null)
+ return new Cursor(rows.subMap(lower, true, upper, true));
+ else if (ectx.rowHandler().get(firstCol, lower) == null && ectx.rowHandler().get(firstCol, upper) != null)
+ return new Cursor(rows.headMap(upper, true));
+ else if (ectx.rowHandler().get(firstCol, lower) != null && ectx.rowHandler().get(firstCol, upper) == null)
+ return new Cursor(rows.tailMap(lower, true));
+ else
+ return new Cursor(rows);
+ }
+
+ /**
+ * Return an iterable to scan index range from lower to upper bounds inclusive,
+ * filtered by {@code filter} predicate.
+ */
+ public Iterable<Row> scan(
+ ExecutionContext<Row> ectx,
+ RelDataType rowType,
+ Predicate<Row> filter,
+ Supplier<Row> lowerBound,
+ Supplier<Row> upperBound
+ ) {
+ return new IndexScan(rowType, this, filter, lowerBound, upperBound);
+ }
+
+ /**
+ *
+ */
+ private class Cursor implements GridCursor<Row> {
+ /** Sub map iterator. */
+ private final Iterator<Map.Entry<Row, List<Row>>> mapIt;
+
+ /** Iterator over rows with equal index keys. */
+ private Iterator<Row> listIt;
+
+ /** */
+ private Row row;
+
+ /** */
+ Cursor(SortedMap<Row, List<Row>> subMap) {
+ mapIt = subMap.entrySet().iterator();
+ listIt = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() throws IgniteCheckedException {
+ if (!hasNext())
+ return false;
+
+ next0();
+
+ return true;
+ }
+
+ /** */
+ private boolean hasNext() {
+ return listIt != null && listIt.hasNext() || mapIt.hasNext();
+ }
+
+ /** */
+ private void next0() {
+ if (listIt == null || !listIt.hasNext())
+ listIt = mapIt.next().getValue().iterator();
+
+ row = listIt.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row get() throws IgniteCheckedException {
+ return row;
+ }
+ }
+
+ /**
+ *
+ */
+ private class IndexScan extends AbstractIndexScan<Row, Row> {
+ /**
+ * @param rowType Row type.
+ * @param idx Physical index.
+ * @param filter Additional filters.
+ * @param lowerBound Lower index scan bound.
+ * @param upperBound Upper index scan bound.
+ */
+ IndexScan(
+ RelDataType rowType,
+ GridIndex<Row> idx,
+ Predicate<Row> filter,
+ Supplier<Row> lowerBound,
+ Supplier<Row> upperBound) {
+ super(RuntimeTreeIndex.this.ectx, rowType, idx, filter, lowerBound, upperBound, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Row row2indexRow(Row bound) {
+ return bound;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Row indexRow2Row(Row row) {
+ return row;
+ }
+
+ /** */
+ @Override protected BPlusTree.TreeRowClosure<Row, Row> filterClosure() {
+ return null;
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
similarity index 53%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
index 4750998..d8745f6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
@@ -17,47 +17,74 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Comparator;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeTreeIndex;
import org.apache.ignite.internal.util.typedef.F;
/**
- * Table spool node.
+ * Index spool node.
*/
-public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
- /** How many rows are requested by downstream. */
- private int requested;
-
- /** How many rows are we waiting for from the upstream. {@code -1} means end of source stream. */
- private int waiting;
+public class IndexSpoolNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+ /** Scan. */
+ private final ScanNode<Row> scan;
- /** Index of the current row to push. */
- private int rowIdx;
+ /** Runtime index */
+ private final RuntimeTreeIndex<Row> idx;
- /** Rows buffer. */
- private final List<Row> rows;
+ /** */
+ private int requested;
- /**
- * Flag indicates that spool pushes row to downstream.
- * Need to check a case when a downstream produces requests on push.
- */
- private boolean inLoop;
+ /** */
+ private int waiting;
/**
* @param ctx Execution context.
*/
- public TableSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+ public IndexSpoolNode(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ RelCollation collation,
+ Comparator<Row> comp,
+ Predicate<Row> filter,
+ Supplier<Row> lowerIdxBound,
+ Supplier<Row> upperIdxBound
+ ) {
super(ctx, rowType);
- rows = new ArrayList<>();
+ idx = new RuntimeTreeIndex<>(ctx, collation, comp);
+
+ scan = new ScanNode<>(
+ ctx,
+ rowType,
+ idx.scan(
+ ctx,
+ rowType,
+ filter,
+ lowerIdxBound,
+ upperIdxBound
+ )
+ );
+ }
+
+ /** */
+ @Override public void onRegister(Downstream<Row> downstream) {
+ scan.onRegister(downstream);
+ }
+
+ /** */
+ @Override public Downstream<Row> downstream() {
+ return scan.downstream();
}
/** {@inheritDoc} */
@Override protected void rewindInternal() {
- requested = 0;
- rowIdx = 0;
+ scan.rewind();
}
/** {@inheritDoc} */
@@ -81,12 +108,13 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
try {
checkState();
- requested += rowsCnt;
+ if (!indexReady()) {
+ requested = rowsCnt;
- if (waiting == -1 && !inLoop)
- context().execute(this::doPush);
- else if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
+ requestSource();
+ }
+ else
+ scan.request(rowsCnt);
}
catch (Exception e) {
onError(e);
@@ -94,49 +122,23 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
}
/** */
- private void doPush() {
- if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
- downstream().end();
-
- return;
- }
-
- while (requested > 0 && rowIdx < rows.size())
- pushToDownstream();
- }
-
- /** */
- private void pushToDownstream() {
- inLoop = true;
+ private void requestSource() {
+ waiting = IN_BUFFER_SIZE;
- downstream().push(rows.get(rowIdx));
-
- inLoop = false;
-
- rowIdx++;
- requested--;
-
- if (rowIdx >= rows.size() && waiting == -1 && requested > 0)
- downstream().end();
+ source().request(IN_BUFFER_SIZE);
}
/** {@inheritDoc} */
@Override public void push(Row row) {
- assert downstream() != null;
- assert waiting > 0;
-
try {
checkState();
- waiting--;
+ idx.push(row);
- rows.add(row);
+ waiting--;
if (waiting == 0)
- source().request(waiting = IN_BUFFER_SIZE);
-
- if (requested > 0 && rowIdx < rows.size())
- pushToDownstream();
+ context().execute(this::requestSource);
}
catch (Exception e) {
onError(e);
@@ -145,19 +147,39 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
/** {@inheritDoc} */
@Override public void end() {
- assert downstream() != null;
- assert waiting > 0;
-
try {
checkState();
waiting = -1;
- if (rowIdx >= rows.size() && requested > 0)
- downstream().end();
+ scan.request(requested);
}
catch (Exception e) {
- downstream().onError(e);
+ scan.downstream().onError(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void closeInternal() {
+ try {
+ scan.close();
+ }
+ catch (Exception ex) {
+ onError(ex);
+ }
+
+ try {
+ idx.close();
}
+ catch (Exception ex) {
+ onError(ex);
+ }
+
+ super.closeInternal();
+ }
+
+ /** */
+ private boolean indexReady() {
+ return waiting == -1;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
index 4750998..d1ad00a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
@@ -83,7 +83,7 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
requested += rowsCnt;
- if (waiting == -1 && !inLoop)
+ if ((waiting == -1 || rowIdx < rows.size()) && !inLoop)
context().execute(this::doPush);
else if (waiting == 0)
source().request(waiting = IN_BUFFER_SIZE);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 5192d92..57cc7b2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -30,6 +30,7 @@ import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
@@ -104,4 +105,13 @@ public class IgniteMdRowCount extends RelMdRowCount {
return rowsCount;
}
+
+ /**
+ * RowCount of Spool equals to estimated row count of its child by default,
+ * but IndexSpool has internal filter that could filter out some rows,
+ * hence we need to estimate it differently.
+ */
+ public double getRowCount(IgniteIndexSpool rel, RelMetadataQuery mq) {
+ return rel.estimateRowCount(mq);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java
index 1de7f26..624466c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
import java.util.List;
+
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdSelectivity;
@@ -30,6 +31,7 @@ import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -91,4 +93,17 @@ public class IgniteMdSelectivity extends RelMdSelectivity {
RexNode diff = RelMdUtil.minusPreds(RexUtils.builder(rel), predicate, condition);
return RelMdUtil.guessSelectivity(diff);
}
+
+ /** */
+ public Double getSelectivity(IgniteIndexSpool rel, RelMetadataQuery mq, RexNode predicate) {
+ if (predicate != null) {
+ return mq.getSelectivity(rel.getInput(),
+ RelMdUtil.minusPreds(
+ rel.getCluster().getRexBuilder(),
+ predicate,
+ rel.condition()));
+ }
+
+ return mq.getSelectivity(rel.getInput(), rel.condition());
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index bc52fa8..34e1ee7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
@@ -45,13 +46,14 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
/** */
-class Cloner implements IgniteRelVisitor<IgniteRel> {
+public class Cloner implements IgniteRelVisitor<IgniteRel> {
/** */
private final RelOptCluster cluster;
/** */
private ImmutableList.Builder<IgniteReceiver> remotes;
+ /** */
Cloner(RelOptCluster cluster) {
this.cluster = cluster;
}
@@ -77,6 +79,13 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** */
+ public static IgniteRel clone(IgniteRel r) {
+ Cloner c = new Cloner(r.getCluster());
+
+ return c.visit(r);
+ }
+
+ /** */
private IgniteReceiver collect(IgniteReceiver receiver) {
if (remotes != null)
remotes.add(receiver);
@@ -158,6 +167,11 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteIndexSpool rel) {
+ return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteLimit rel) {
return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 9db3bf1..4b213e6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationMappingException;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
@@ -34,6 +35,8 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -50,6 +53,7 @@ public class Fragment {
private final IgniteRel root;
/** Serialized root representation. */
+ @GridToStringExclude
private final String rootSer;
/** */
@@ -183,6 +187,6 @@ public class Fragment {
/** {@inheritDoc} */
@Override public String toString() {
- return rootSer;
+ return S.toString(Fragment.class, this, "root", RelOptUtil.toString(root));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
index 83fe9e7..bf55da8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -46,6 +46,7 @@ public class FragmentSplitter extends IgniteRelShuttle {
/** */
private FragmentProto curr;
+ /** */
public FragmentSplitter(RelNode cutPoint) {
this.cutPoint = cutPoint;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index c29e2e2..870962c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
@@ -145,6 +146,11 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteIndexSpool rel) {
+ return processNode(rel);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteRel rel) {
return rel.accept(this);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index a18899d..768fc24 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -42,6 +42,7 @@ import org.apache.calcite.tools.RuleSets;
import org.apache.ignite.internal.processors.query.calcite.rule.AggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.CorrelatedNestedLoopJoinRule;
import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMergeRule;
import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
@@ -165,7 +166,8 @@ public enum PlannerPhase {
FilterConverterRule.INSTANCE,
TableModifyConverterRule.INSTANCE,
UnionConverterRule.INSTANCE,
- SortConverterRule.INSTANCE
+ SortConverterRule.INSTANCE,
+ FilterSpoolMergeRule.INSTANCE
)
);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index f170a4b..735fa85 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
@@ -49,8 +50,11 @@ public class Splitter extends IgniteRelShuttle {
while (!stack.isEmpty()) {
curr = stack.pop();
+
curr.root = visit(curr.root);
+
res.add(curr.build());
+
curr = null;
}
@@ -81,7 +85,7 @@ public class Splitter extends IgniteRelShuttle {
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteTrimExchange rel) {
- return rel.clone(IdGenerator.nextId());
+ return ((IgniteTrimExchange)processNode(rel)).clone(IdGenerator.nextId());
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
index 5281757..7260e75 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
@@ -28,16 +28,12 @@ import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
import org.jetbrains.annotations.Nullable;
/**
@@ -48,16 +44,7 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
protected final String idxName;
/** */
- protected List<RexNode> lowerCond;
-
- /** */
- protected List<RexNode> upperCond;
-
- /** */
- protected List<RexNode> lowerBound;
-
- /** */
- protected List<RexNode> upperBound;
+ protected final IndexConditions idxCond;
/**
* Constructor used for deserialization.
@@ -67,8 +54,7 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
protected AbstractIndexScan(RelInput input) {
super(input);
idxName = input.getString("index");
- lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
- upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
+ idxCond = new IndexConditions(input);
}
/** */
@@ -80,23 +66,21 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
String idxName,
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
- @Nullable List<RexNode> lowerCond,
- @Nullable List<RexNode> upperCond,
- @Nullable ImmutableBitSet reqColunms
+ @Nullable IndexConditions idxCond,
+ @Nullable ImmutableBitSet reqColumns
) {
- super(cluster, traitSet, hints, table, proj, cond, reqColunms);
+ super(cluster, traitSet, hints, table, proj, cond, reqColumns);
+
this.idxName = idxName;
- this.lowerCond = lowerCond;
- this.upperCond = upperCond;
+ this.idxCond = idxCond;
}
/** {@inheritDoc} */
@Override protected RelWriter explainTerms0(RelWriter pw) {
pw = pw.item("index", idxName);
pw = super.explainTerms0(pw);
- return pw
- .itemIf("lower", lowerBound, !F.isEmpty(lowerBound()))
- .itemIf("upper", upperBound, !F.isEmpty(upperBound()));
+
+ return idxCond.explainTerms(pw);
}
/**
@@ -110,44 +94,28 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
* @return Lower index condition.
*/
public List<RexNode> lowerCondition() {
- return lowerCond;
+ return idxCond == null ? null : idxCond.lowerCondition();
}
/**
* @return Lower index condition.
*/
public List<RexNode> lowerBound() {
- if (lowerBound == null && lowerCond != null) {
- RelDataType rowType = getTable().getRowType();
- Mappings.TargetMapping mapping = null;
- if (requiredColumns() != null)
- mapping = Commons.inverceMapping(requiredColumns(), rowType.getFieldCount());
- lowerBound = RexUtils.asBound(getCluster(), lowerCond, rowType, mapping);
- }
-
- return lowerBound;
+ return idxCond == null ? null : idxCond.lowerBound();
}
/**
* @return Upper index condition.
*/
public List<RexNode> upperCondition() {
- return upperCond;
+ return idxCond == null ? null : idxCond.upperCondition();
}
/**
* @return Upper index condition.
*/
public List<RexNode> upperBound() {
- if (upperBound == null && upperCond != null) {
- RelDataType rowType = getTable().getRowType();
- Mappings.TargetMapping mapping = null;
- if (requiredColumns() != null)
- mapping = Commons.inverceMapping(requiredColumns(), rowType.getFieldCount());
- upperBound = RexUtils.asBound(getCluster(), upperCond, rowType, mapping);
- }
-
- return upperBound;
+ return idxCond == null ? null : idxCond.upperBound();
}
/** {@inheritDoc} */
@@ -163,16 +131,16 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
cost = 0;
- if (lowerCond != null) {
- double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, lowerCond));
+ if (lowerCondition() != null) {
+ double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, lowerCondition()));
selectivity -= 1 - selectivity0;
cost += Math.log(rows);
}
- if (upperCond != null) {
- double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, upperCond));
+ if (upperCondition() != null) {
+ double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, upperCondition()));
selectivity -= 1 - selectivity0;
}
@@ -187,4 +155,9 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
return planner.getCostFactory().makeCost(rows, cost, 0);
}
+
+ /** */
+ public IndexConditions indexConditions() {
+ return idxCond;
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
index f3eef52..e951b34 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
@@ -113,7 +113,7 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
ImmutableIntList keys = distribution.getKeys();
if (isSimple(this) && groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.inverceMapping(
+ Mappings.TargetMapping mapping = Commons.inverseMapping(
groupSet, getInput().getRowType().getFieldCount());
List<Integer> srcKeys = new ArrayList<>(keys.size());
@@ -188,7 +188,7 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
ImmutableIntList keys = distribution.getKeys();
if (groupSet.cardinality() == keys.size()) {
- Mappings.TargetMapping mapping = Commons.inverceMapping(
+ Mappings.TargetMapping mapping = Commons.inverseMapping(
groupSet, getInput().getRowType().getFieldCount());
IgniteDistribution outDistr = distribution.apply(mapping);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
index e270f12..da6cab5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -27,6 +27,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
@@ -43,6 +44,9 @@ import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrai
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.maxPrefix;
/**
* Relational expression that combines two relational expressions according to
@@ -92,21 +96,33 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
+ RelCollation leftCollation = TraitUtils.collation(left), rightCollation = TraitUtils.collation(right);
+
+ List<Integer> newRightCollationFields = maxPrefix(rightCollation.getKeys(), joinInfo.leftKeys);
+
+ if (F.isEmpty(newRightCollationFields))
+ return ImmutableList.of();
+
// We preserve left edge collation only if batch size == 1
if (variablesSet.size() == 1)
- return super.deriveCollation(nodeTraits, inputTraits);
-
- RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
+ nodeTraits = nodeTraits.replace(leftCollation);
+ else
+ nodeTraits = nodeTraits.replace(RelCollations.EMPTY);
- return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(left.replace(RelCollations.EMPTY), right.replace(RelCollations.EMPTY))));
+ return ImmutableList.of(Pair.of(nodeTraits, inputTraits));
}
/** {@inheritDoc} */
- @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
// Correlated nested loop requires rewindable right edge.
-
RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
RewindabilityTrait rewindability = TraitUtils.rewindability(left);
@@ -116,15 +132,30 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
}
/** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // We preserve left edge collation only if batch size == 1
- if (variablesSet.size() == 1)
- return super.passThroughCollation(nodeTraits, inputTraits);
-
+ @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
+ // Index lookup (collation) is required for right input.
+ RelCollation rightReplace = RelCollations.of(joinInfo.rightKeys);
+
+ // We preserve left edge collation only if batch size == 1
+ if (variablesSet.size() == 1) {
+ Pair<RelTraitSet, List<RelTraitSet>> baseTraits = super.passThroughCollation(nodeTraits, inputTraits);
+
+ return Pair.of(
+ baseTraits.getKey(),
+ ImmutableList.of(
+ baseTraits.getValue().get(0),
+ baseTraits.getValue().get(1).replace(rightReplace)
+ )
+ );
+ }
+
return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
- ImmutableList.of(left.replace(RelCollations.EMPTY), right.replace(RelCollations.EMPTY)));
+ ImmutableList.of(left.replace(RelCollations.EMPTY), right.replace(rightReplace)));
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
index d5e24c4..d2dbe0c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
@@ -65,7 +66,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
RelTraitSet traits,
RelOptTable tbl,
String idxName) {
- this(cluster, traits, tbl, idxName, null, null, null, null, null);
+ this(cluster, traits, tbl, idxName, null, null, null, null);
}
/**
@@ -76,7 +77,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
* @param idxName Index name.
* @param proj Projects.
* @param cond Filters.
- * @param requiredColunms Participating colunms.
+ * @param requiredCols Participating columns.
*/
public IgniteIndexScan(
RelOptCluster cluster,
@@ -85,11 +86,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
String idxName,
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
- @Nullable List<RexNode> lowerIdxCond,
- @Nullable List<RexNode> upperIdxCond,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable IndexConditions idxCond,
+ @Nullable ImmutableBitSet requiredCols
) {
- this(-1L, cluster, traits, tbl, idxName, proj, cond, lowerIdxCond, upperIdxCond, requiredColunms);
+ this(-1L, cluster, traits, tbl, idxName, proj, cond, idxCond, requiredCols);
}
/**
@@ -100,7 +100,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
* @param idxName Index name.
* @param proj Projects.
* @param cond Filters.
- * @param requiredColunms Participating colunms.
+ * @param requiredCols Participating colunms.
*/
private IgniteIndexScan(
long sourceId,
@@ -110,11 +110,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
String idxName,
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
- @Nullable List<RexNode> lowerIdxCond,
- @Nullable List<RexNode> upperIdxCond,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable IndexConditions idxCond,
+ @Nullable ImmutableBitSet requiredCols
) {
- super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, lowerIdxCond, upperIdxCond, requiredColunms);
+ super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, idxCond, requiredCols);
this.sourceId = sourceId;
}
@@ -138,12 +137,12 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
/** {@inheritDoc} */
@Override public IgniteRel clone(long sourceId) {
return new IgniteIndexScan(sourceId, getCluster(), getTraitSet(), getTable(),
- idxName, projects, condition, lowerCond, upperCond, requiredColumns);
+ idxName, projects, condition, idxCond, requiredColumns);
}
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteIndexScan(sourceId, cluster, getTraitSet(), getTable(),
- idxName, projects, condition, lowerCond, upperCond, requiredColumns);
+ idxName, projects, condition, idxCond, requiredColumns);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexSpool.java
new file mode 100644
index 0000000..dc9e062
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexSpool.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
+
+/**
+ * Relational operator that returns the sorted contents of a table
+ * and allow to lookup rows by specified bounds.
+ */
+public class IgniteIndexSpool extends Spool implements IgniteRel {
+ /** */
+ private final RelCollation collation;
+
+ /** Index condition. */
+ private final IndexConditions idxCond;
+
+ /** Filters. */
+ protected final RexNode condition;
+
+ /** */
+ public IgniteIndexSpool(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ RelCollation collation,
+ RexNode condition,
+ IndexConditions idxCond
+ ) {
+ super(cluster, traits, input, Type.LAZY, Type.EAGER);
+
+ assert Objects.nonNull(idxCond);
+ assert Objects.nonNull(condition);
+
+ this.idxCond = idxCond;
+ this.condition = condition;
+ this.collation = collation;
+ }
+
+ /**
+ * Constructor used for deserialization.
+ *
+ * @param input Serialized representation.
+ */
+ public IgniteIndexSpool(RelInput input) {
+ this(input.getCluster(),
+ input.getTraitSet().replace(IgniteConvention.INSTANCE),
+ input.getInputs().get(0),
+ input.getCollation(),
+ input.getExpression("condition"),
+ new IndexConditions(input)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** */
+ @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+ return new IgniteIndexSpool(cluster, getTraitSet(), inputs.get(0), collation, condition, idxCond);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Spool copy(RelTraitSet traitSet, RelNode input, Type readType, Type writeType) {
+ return new IgniteIndexSpool(getCluster(), traitSet, input, collation, condition, idxCond);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEnforcer() {
+ return true;
+ }
+
+ /** */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ RelWriter writer = super.explainTerms(pw);
+
+ writer.item("condition", condition);
+ writer.item("collation", collation);
+
+ return idxCond.explainTerms(writer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ double rowCnt = mq.getRowCount(getInput());
+ double bytesPerRow = getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+ double totalBytes = rowCnt * bytesPerRow;
+ double cpuCost = rowCnt * IgniteCost.ROW_PASS_THROUGH_COST;
+
+ if (idxCond.lowerCondition() != null)
+ cpuCost += Math.log(rowCnt) * IgniteCost.ROW_COMPARISON_COST;
+
+ IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+ return costFactory.makeCost(rowCnt, cpuCost, 0, totalBytes, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ return mq.getRowCount(getInput()) * mq.getSelectivity(this, null);
+ }
+
+ /** */
+ public IndexConditions indexCondition() {
+ return idxCond;
+ }
+
+ /** */
+ @Override public RelCollation collation() {
+ return collation;
+ }
+
+ /** */
+ public RexNode condition() {
+ return condition;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
index 534719d..bef81a9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
@@ -69,6 +70,18 @@ public class IgniteLimit extends SingleRel implements IgniteRel {
this.fetch = fetch;
}
+ /** */
+ public IgniteLimit(RelInput input) {
+ super(
+ input.getCluster(),
+ input.getTraitSet().replace(IgniteConvention.INSTANCE),
+ input.getInputs().get(0)
+ );
+
+ offset = input.getExpression("offset");
+ fetch = input.getExpression("fetch");
+ }
+
/** {@inheritDoc} */
@Override public final IgniteLimit copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new IgniteLimit(getCluster(), traitSet, sole(inputs), offset, fetch);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index 11a7216..a202604 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -32,8 +31,6 @@ import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
@@ -47,6 +44,10 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteC
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.createCollation;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.isPrefix;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.maxPrefix;
+
/** */
public class IgniteMergeJoin extends AbstractIgniteJoin {
/** */
@@ -98,7 +99,7 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
Map<Integer, Integer> leftToRight = joinInfo.pairs().stream()
.collect(Collectors.toMap(p -> p.source, p -> p.target));
- newRightCollation = newLeftCollation.stream().map(leftToRight::get).collect(Collectors.toList());
+ newRightCollation = newLeftCollation.subList(0, leftToRight.size()).stream().map(leftToRight::get).collect(Collectors.toList());
}
else if (isPrefix(rightCollation.getKeys(), joinInfo.rightKeys)) { // preserve right collation
newRightCollation = new ArrayList<>(rightCollation.getKeys());
@@ -106,7 +107,7 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
Map<Integer, Integer> rightToLeft = joinInfo.pairs().stream()
.collect(Collectors.toMap(p -> p.target, p -> p.source));
- newLeftCollation = newRightCollation.stream().map(rightToLeft::get).collect(Collectors.toList());
+ newLeftCollation = newRightCollation.subList(0, rightToLeft.size()).stream().map(rightToLeft::get).collect(Collectors.toList());
}
else { // generate new collations
// TODO: generate permutations when there will be multitraits
@@ -226,63 +227,4 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
return costFactory.makeCost(rows,
rows * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0);
}
-
- /**
- * Returns the longest possible prefix of {@code seq} that could be form from provided {@code elems}.
- *
- * @param seq Sequence.
- * @param elems Elems.
- * @return The longest possible prefix of {@code seq}.
- */
- private static <T> List<T> maxPrefix(List<T> seq, Collection<T> elems) {
- List<T> res = new ArrayList<>();
-
- Set<T> elems0 = new HashSet<>(elems);
-
- for (T e : seq) {
- if (!elems0.remove(e))
- break;
-
- res.add(e);
- }
-
- return res;
- }
-
- /**
- * Checks if there is a such permutation of all {@code elems} that is prefix of
- * provided {@code seq}.
- *
- * @param seq Sequence.
- * @param elems Elems.
- * @return {@code true} if there is a permutation of all {@code elems} that is prefix of {@code seq}.
- */
- private static <T> boolean isPrefix(List<T> seq, Collection<T> elems) {
- Set<T> elems0 = new HashSet<>(elems);
-
- if (seq.size() < elems0.size())
- return false;
-
- for (T e : seq) {
- if (!elems0.remove(e))
- return false;
-
- if (elems0.isEmpty())
- break;
- }
-
- return true;
- }
-
- /**
- * Creates collations from provided keys.
- *
- * @param keys The keys to create collation from.
- * @return New collation.
- */
- private static RelCollation createCollation(List<Integer> keys) {
- return RelCollations.of(
- keys.stream().map(RelFieldCollation::new).collect(Collectors.toList())
- );
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index b86596b..2db2cc1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -119,6 +119,11 @@ public interface IgniteRelVisitor<T> {
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
+ T visit(IgniteIndexSpool rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
T visit(IgniteLimit rel);
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
index be16129..fae13ad 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -30,10 +30,8 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
@@ -55,14 +53,13 @@ public class IgniteTrimExchange extends Exchange implements SourceAwareIgniteRel
super(cluster, traits, input, distribution);
assert distribution.getType() == HASH_DISTRIBUTED;
- assert input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE).getType() == BROADCAST_DISTRIBUTED;
this.sourceId = sourceId;
}
/** */
public IgniteTrimExchange(RelInput input) {
- super(changeTraits(input, IgniteConvention.INSTANCE));
+ super(changeTraits(input, IgniteConvention.INSTANCE, input.getDistribution()));
Object srcIdObj = input.get("sourceId");
if (srcIdObj != null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
index f484558..0747df2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
@@ -158,7 +158,7 @@ public abstract class ProjectableFilterableTableScan extends TableScan {
IgniteTypeFactory typeFactory = Commons.typeFactory(getCluster());
IgniteTable tbl = getTable().unwrap(IgniteTable.class);
- Mappings.TargetMapping mapping = RexUtils.invercePermutation(projects,
+ Mappings.TargetMapping mapping = RexUtils.inversePermutation(projects,
tbl.getRowType(typeFactory, requiredColumns), true);
RexShuttle shuttle = new RexShuttle() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
index 08c1609..a174cf8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.rel.logical;
import java.util.List;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
@@ -26,15 +27,14 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
-import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
/** */
@@ -47,27 +47,37 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
String idxName,
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) {
IgniteTable tbl = table.unwrap(IgniteTable.class);
IgniteTypeFactory typeFactory = Commons.typeFactory(cluster);
- RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
+ RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
RelCollation collation = tbl.getIndex(idxName).collation();
- if (requiredColunms != null) {
- Mappings.TargetMapping targetMapping = Commons.mapping(requiredColunms,
+ if (requiredColumns != null) {
+ Mappings.TargetMapping targetMapping = Commons.mapping(requiredColumns,
tbl.getRowType(typeFactory).getFieldCount());
collation = collation.apply(targetMapping);
if (proj != null)
collation = TraitUtils.projectCollation(collation, proj, rowType);
}
- Pair<List<RexNode>, List<RexNode>> pair = RexUtils.buildIndexConditions(cluster, collation, cond, rowType);
-
- List<RexNode> lowerIdxCond = F.isEmpty(pair.left) ? null : pair.left;
- List<RexNode> upperIdxCond = F.isEmpty(pair.right) ? null : pair.right;
+ IndexConditions idxCond = RexUtils.buildIndexConditions(
+ cluster,
+ collation,
+ cond,
+ tbl.getRowType(typeFactory),
+ requiredColumns);
- return new IgniteLogicalIndexScan(cluster, traits, table, idxName, proj, cond, lowerIdxCond, upperIdxCond, requiredColunms);
+ return new IgniteLogicalIndexScan(
+ cluster,
+ traits,
+ table,
+ idxName,
+ proj,
+ cond,
+ idxCond,
+ requiredColumns);
}
/**
@@ -78,9 +88,8 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
* @param idxName Index name.
* @param proj Projects.
* @param cond Filters.
- * @param lowerCond Lower condition.
- * @param upperCond Upper condition.
- * @param requiredColunms Participating colunms.
+ * @param idxCond Index conditions.
+ * @param requiredCols Participating columns.
*/
private IgniteLogicalIndexScan(
RelOptCluster cluster,
@@ -89,10 +98,9 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
String idxName,
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
- @Nullable List<RexNode> lowerCond,
- @Nullable List<RexNode> upperCond,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable IndexConditions idxCond,
+ @Nullable ImmutableBitSet requiredCols
) {
- super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, lowerCond, upperCond, requiredColunms);
+ super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, idxCond, requiredCols);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index 3a08413..4992795 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.rel.logical;
import java.util.List;
+
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
@@ -36,9 +37,9 @@ public class IgniteLogicalTableScan extends ProjectableFilterableTableScan {
RelOptTable tbl,
@Nullable List<RexNode> proj,
@Nullable RexNode cond,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) {
- return new IgniteLogicalTableScan(cluster, traits, tbl, proj, cond, requiredColunms);
+ return new IgniteLogicalTableScan(cluster, traits, tbl, proj, cond, requiredColumns);
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
index d4f2da0..d035404 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
@@ -117,7 +118,10 @@ public class CorrelatedNestedLoopJoinRule extends ConverterRule {
RelTraitSet filterInTraits = rel.getRight().getTraitSet().replace(RewindabilityTrait.REWINDABLE);
// Push a filter with batchSize disjunctions
- relBuilder.push(rel.getRight().copy(filterInTraits, rel.getRight().getInputs())).filter(relBuilder.or(conditionList));
+ relBuilder
+ .push(rel.getRight().copy(filterInTraits, rel.getRight().getInputs()))
+ .filter(relBuilder.or(conditionList));
+
RelNode right = relBuilder.build();
CorrelationTrait corrTrait = CorrelationTrait.correlations(correlationIds);
@@ -135,7 +139,17 @@ public class CorrelatedNestedLoopJoinRule extends ConverterRule {
RelNode left = convert(rel.getLeft(), leftInTraits);
right = convert(right, rightInTraits);
- call.transformTo(new IgniteCorrelatedNestedLoopJoin(cluster, outTraits, left, right, rel.getCondition(), correlationIds, joinType));
+ call.transformTo(
+ new IgniteCorrelatedNestedLoopJoin(
+ cluster,
+ outTraits,
+ left,
+ right,
+ rel.getCondition(),
+ correlationIds,
+ joinType
+ )
+ );
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterSpoolMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterSpoolMergeRule.java
new file mode 100644
index 0000000..eb989ea
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterSpoolMergeRule.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Rule that pushes filter into the spool.
+ */
+public class FilterSpoolMergeRule extends RelRule<FilterSpoolMergeRule.Config> {
+ /** Instance. */
+ public static final RelOptRule INSTANCE = Config.DEFAULT.toRule();
+
+ /** */
+ private FilterSpoolMergeRule(Config cfg) {
+ super(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMatch(RelOptRuleCall call) {
+ final IgniteFilter filter = call.rel(0);
+ final IgniteTableSpool spool = call.rel(1);
+
+ RelOptCluster cluster = spool.getCluster();
+
+ RelTraitSet trait = spool.getTraitSet();
+ CorrelationTrait filterCorr = TraitUtils.correlation(filter);
+
+ if (filterCorr.correlated())
+ trait = trait.replace(filterCorr);
+
+ RelNode input = spool.getInput();
+
+ IndexConditions idxCond = RexUtils.buildIndexConditions(
+ cluster,
+ TraitUtils.collation(input),
+ filter.getCondition(),
+ spool.getRowType(),
+ null
+ );
+
+ if (F.isEmpty(idxCond.lowerCondition()) && F.isEmpty(idxCond.upperCondition()))
+ return;
+
+ RelCollation collation = TraitUtils.collation(input);
+
+ RelNode res = new IgniteIndexSpool(
+ cluster,
+ trait.replace(collation),
+ convert(input, input.getTraitSet().replace(collation)),
+ collation,
+ filter.getCondition(),
+ idxCond
+ );
+
+ call.transformTo(res);
+ }
+
+ /** */
+ @SuppressWarnings("ClassNameSameAsAncestorName")
+ public interface Config extends RelRule.Config {
+ /** */
+ Config DEFAULT = RelRule.Config.EMPTY
+ .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+ .withDescription("FilterSpoolMergeRule")
+ .as(FilterSpoolMergeRule.Config.class)
+ .withOperandFor(IgniteFilter.class, IgniteTableSpool.class);
+
+ /** Defines an operand tree for the given classes. */
+ default Config withOperandFor(Class<? extends Filter> filterClass, Class<? extends Spool> spoolClass) {
+ return withOperandSupplier(
+ o0 -> o0.operand(filterClass)
+ .oneInput(o1 -> o1.operand(spoolClass)
+ .anyInputs()
+ )
+ )
+ .as(Config.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override default FilterSpoolMergeRule toRule() {
+ return new FilterSpoolMergeRule(this);
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
index 5d6f546..7c3226d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
@@ -36,20 +36,35 @@ import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTableScan> extends AbstractIgniteConverterRule<T> {
/** Instance. */
public static final LogicalScanConverterRule<IgniteLogicalIndexScan> INDEX_SCAN =
- new LogicalScanConverterRule<IgniteLogicalIndexScan>(IgniteLogicalIndexScan.class, "LogicalTableScanConverterRule") {
+ new LogicalScanConverterRule<IgniteLogicalIndexScan>(IgniteLogicalIndexScan.class, "LogicalIndexScanConverterRule") {
/** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, IgniteLogicalIndexScan rel) {
- return new IgniteIndexScan(rel.getCluster(), rel.getTraitSet().replace(IgniteConvention.INSTANCE),
- rel.getTable(), rel.indexName(), rel.projects(), rel.condition(), rel.lowerCondition(),
- rel.upperCondition(), rel.requiredColumns());
+ @Override protected PhysicalNode convert(
+ RelOptPlanner planner,
+ RelMetadataQuery mq,
+ IgniteLogicalIndexScan rel
+ ) {
+ return new IgniteIndexScan(
+ rel.getCluster(),
+ rel.getTraitSet().replace(IgniteConvention.INSTANCE),
+ rel.getTable(),
+ rel.indexName(),
+ rel.projects(),
+ rel.condition(),
+ rel.indexConditions(),
+ rel.requiredColumns()
+ );
}
};
/** Instance. */
public static final LogicalScanConverterRule<IgniteLogicalTableScan> TABLE_SCAN =
- new LogicalScanConverterRule<IgniteLogicalTableScan>(IgniteLogicalTableScan.class, "LogicalIndexScanConverterRule") {
+ new LogicalScanConverterRule<IgniteLogicalTableScan>(IgniteLogicalTableScan.class, "LogicalTableScanConverterRule") {
/** {@inheritDoc} */
- @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, IgniteLogicalTableScan rel) {
+ @Override protected PhysicalNode convert(
+ RelOptPlanner planner,
+ RelMetadataQuery mq,
+ IgniteLogicalTableScan rel
+ ) {
RelTraitSet traits = rel.getTraitSet().replace(IgniteConvention.INSTANCE);
Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
index f92de43..95bb653 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
@@ -50,10 +50,22 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
"ProjectIndexScanMergeRule"
) {
/** {@inheritDoc} */
- @Override protected IgniteLogicalIndexScan createNode(RelOptCluster cluster, IgniteLogicalIndexScan scan,
- RelTraitSet traits, List<RexNode> projections, RexNode cond, ImmutableBitSet requiredColunms) {
- return IgniteLogicalIndexScan.create(cluster, traits, scan.getTable(), scan.indexName(), projections,
- cond, requiredColunms);
+ @Override protected IgniteLogicalIndexScan createNode(
+ RelOptCluster cluster,
+ IgniteLogicalIndexScan scan,
+ RelTraitSet traits,
+ List<RexNode> projections,
+ RexNode cond,
+ ImmutableBitSet requiredColumns
+ ) {
+ return IgniteLogicalIndexScan.create(
+ cluster,
+ traits,
+ scan.getTable(),
+ scan.indexName(),
+ projections,
+ cond, requiredColumns
+ );
}
};
@@ -65,15 +77,34 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
"ProjectTableScanMergeRule"
) {
/** {@inheritDoc} */
- @Override protected IgniteLogicalTableScan createNode(RelOptCluster cluster, IgniteLogicalTableScan scan,
- RelTraitSet traits, List<RexNode> projections, RexNode cond, ImmutableBitSet requiredColunms) {
- return IgniteLogicalTableScan.create(cluster, traits, scan.getTable(), projections, cond, requiredColunms);
+ @Override protected IgniteLogicalTableScan createNode(
+ RelOptCluster cluster,
+ IgniteLogicalTableScan scan,
+ RelTraitSet traits,
+ List<RexNode> projections,
+ RexNode cond,
+ ImmutableBitSet requiredColumns
+ ) {
+ return IgniteLogicalTableScan.create(
+ cluster,
+ traits,
+ scan.getTable(),
+ projections,
+ cond,
+ requiredColumns
+ );
}
};
/** */
- protected abstract T createNode(RelOptCluster cluster, T scan, RelTraitSet traits, List<RexNode> projections,
- RexNode cond, ImmutableBitSet requiredColunms);
+ protected abstract T createNode(
+ RelOptCluster cluster,
+ T scan,
+ RelTraitSet traits,
+ List<RexNode> projections,
+ RexNode cond,
+ ImmutableBitSet requiredColumns
+ );
/**
* Constructor.
@@ -134,9 +165,9 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
}
}.apply(cond);
- ImmutableBitSet requiredColunms = builder.build();
+ ImmutableBitSet requiredColumns = builder.build();
- Mappings.TargetMapping targetMapping = Commons.mapping(requiredColunms,
+ Mappings.TargetMapping targetMapping = Commons.mapping(requiredColumns,
tbl.getRowType(typeFactory).getFieldCount());
projects = new RexShuttle() {
@@ -145,7 +176,7 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
}
}.apply(projects);
- if (RexUtils.isIdentity(projects, tbl.getRowType(typeFactory, requiredColunms), true))
+ if (RexUtils.isIdentity(projects, tbl.getRowType(typeFactory, requiredColumns), true))
projects = null;
cond = new RexShuttle() {
@@ -154,6 +185,6 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
}
}.apply(cond);
- call.transformTo(createNode(cluster, scan, traits, projects, cond, requiredColunms));
+ call.transformTo(createNode(cluster, scan, traits, projects, cond, requiredColumns));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
index 8b6ad1a..f99a9be 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
@@ -82,11 +82,11 @@ public class IgniteIndex {
Supplier<Row> lowerIdxConditions,
Supplier<Row> upperIdxConditions,
Function<Row, Row> rowTransformer,
- @Nullable ImmutableBitSet requiredColunms) {
+ @Nullable ImmutableBitSet requiredColumns) {
UUID localNodeId = execCtx.planningContext().localNodeId();
if (group.nodeIds().contains(localNodeId))
return new IndexScan<>(execCtx, table().descriptor(), idx, group.partitions(localNodeId),
- filters, lowerIdxConditions, upperIdxConditions, rowTransformer, requiredColunms);
+ filters, lowerIdxConditions, upperIdxConditions, rowTransformer, requiredColumns);
return Collections.emptyList();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 61073d4..3e06946 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -207,7 +207,7 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
ExecutionContext<Row> ectx,
CacheDataRow row,
RowHandler.RowFactory<Row> factory,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) throws IgniteCheckedException {
RowHandler<Row> handler = factory.handler();
@@ -215,9 +215,9 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
Row res = factory.create();
- assert handler.columnCount(res) == (requiredColunms == null ? descriptors.length : requiredColunms.cardinality());
+ assert handler.columnCount(res) == (requiredColumns == null ? descriptors.length : requiredColumns.cardinality());
- if (requiredColunms == null) {
+ if (requiredColumns == null) {
for (int i = 0; i < descriptors.length; i++) {
ColumnDescriptor desc = descriptors[i];
@@ -226,7 +226,7 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
}
}
else {
- for (int i = 0, j = requiredColunms.nextSetBit(0); j != -1; j = requiredColunms.nextSetBit(j + 1), i++) {
+ for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
ColumnDescriptor desc = descriptors[j];
handler.set(i, res, TypeUtils.toInternal(ectx,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 9ea6bb1..3856c83 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -35,6 +36,7 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
@@ -436,6 +438,18 @@ public class TraitUtils {
return processed;
}
+ /**
+ * Creates collations from provided keys.
+ *
+ * @param keys The keys to create collation from.
+ * @return New collation.
+ */
+ public static RelCollation createCollation(List<Integer> keys) {
+ return RelCollations.of(
+ keys.stream().map(RelFieldCollation::new).collect(Collectors.toList())
+ );
+ }
+
/** */
private static class PropagationContext {
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 867882f..7dc7a32 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.util;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+
import org.apache.calcite.config.CalciteSystemProperty;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.Context;
@@ -325,10 +327,57 @@ public final class Commons {
}
/** */
- public static Mappings.TargetMapping inverceMapping(ImmutableBitSet bitSet, int sourceSize) {
+ public static Mappings.TargetMapping inverseMapping(ImmutableBitSet bitSet, int sourceSize) {
Mapping mapping = Mappings.create(MappingType.INVERSE_FUNCTION, sourceSize, bitSet.cardinality());
for (Ord<Integer> ord : Ord.zip(bitSet))
mapping.set(ord.e, ord.i);
return mapping;
}
+
+ /**
+ * Checks if there is a such permutation of all {@code elems} that is prefix of
+ * provided {@code seq}.
+ *
+ * @param seq Sequence.
+ * @param elems Elems.
+ * @return {@code true} if there is a permutation of all {@code elems} that is prefix of {@code seq}.
+ */
+ public static <T> boolean isPrefix(List<T> seq, Collection<T> elems) {
+ Set<T> elems0 = new HashSet<>(elems);
+
+ if (seq.size() < elems0.size())
+ return false;
+
+ for (T e : seq) {
+ if (!elems0.remove(e))
+ return false;
+
+ if (elems0.isEmpty())
+ break;
+ }
+
+ return true;
+ }
+
+ /**
+ * Returns the longest possible prefix of {@code seq} that could be form from provided {@code elems}.
+ *
+ * @param seq Sequence.
+ * @param elems Elems.
+ * @return The longest possible prefix of {@code seq}.
+ */
+ public static <T> List<T> maxPrefix(List<T> seq, Collection<T> elems) {
+ List<T> res = new ArrayList<>();
+
+ Set<T> elems0 = new HashSet<>(elems);
+
+ for (T e : seq) {
+ if (!elems0.remove(e))
+ break;
+
+ res.add(e);
+ }
+
+ return res;
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IndexConditions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IndexConditions.java
new file mode 100644
index 0000000..230652b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IndexConditions.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Index conditions and bounds holder.
+ * Conditions are not printed to terms (serialized). They are used only to calculate selectivity.
+ */
+public class IndexConditions {
+ /** */
+ private final List<RexNode> lowerCond;
+
+ /** */
+ private final List<RexNode> upperCond;
+
+ /** */
+ private final List<RexNode> lowerBound;
+
+ /** */
+ private final List<RexNode> upperBound;
+
+ /** */
+ public IndexConditions() {
+ this(null, null, null, null);
+ }
+
+ /**
+ */
+ public IndexConditions(
+ @Nullable List<RexNode> lowerCond,
+ @Nullable List<RexNode> upperCond,
+ @Nullable List<RexNode> lowerBound,
+ @Nullable List<RexNode> upperBound
+ ) {
+ this.lowerCond = lowerCond;
+ this.upperCond = upperCond;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ /** */
+ public IndexConditions(RelInput input) {
+ lowerCond = null;
+ upperCond = null;
+ lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
+ upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
+ }
+
+ /**
+ * @return Lower index condition.
+ */
+ public List<RexNode> lowerCondition() {
+ return lowerCond;
+ }
+
+ /**
+ * @return Upper index condition.
+ */
+ public List<RexNode> upperCondition() {
+ return upperCond;
+ }
+
+ /**
+ * @return Lower index bounds (a row with values at the index columns).
+ */
+ public List<RexNode> lowerBound() {
+ return lowerBound;
+ }
+
+ /**
+ * @return Upper index bounds (a row with values at the index columns).
+ */
+ public List<RexNode> upperBound() {
+ return upperBound;
+ }
+
+ /** */
+ public ImmutableIntList keys() {
+ if (upperBound == null && lowerBound == null)
+ return ImmutableIntList.of();
+
+ List<Integer> keys = new ArrayList<>();
+
+ int cols = lowerBound != null ? lowerBound.size() : upperBound.size();
+
+ for (int i = 0; i < cols; ++i ) {
+ if (upperBound != null && RexUtils.isNotNull(upperBound.get(i))
+ || lowerBound != null && RexUtils.isNotNull(lowerBound.get(i)))
+ keys.add(i);
+ }
+
+ return ImmutableIntList.copyOf(keys);
+ }
+
+ /**
+ * Describes index bounds.
+ *
+ * @param pw Plan writer
+ * @return Plan writer for fluent-explain pattern
+ */
+ public RelWriter explainTerms(RelWriter pw) {
+ return pw
+ .itemIf("lower", lowerBound, !F.isEmpty(lowerBound))
+ .itemIf("upper", upperBound, !F.isEmpty(upperBound));
+ }
+
+ /** */
+ @Override public String toString() {
+ return S.toString(IndexConditions.class, this,
+ "lower", lowerCond,
+ "upper", upperCond,
+ "lowerBound", lowerBound,
+ "upperBound", upperBound);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
index f67b44e..da3a94e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.ImmutableList;
+
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPredicateList;
@@ -58,8 +58,8 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ControlFlowException;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Litmus;
-import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.mapping.MappingType;
import org.apache.calcite.util.mapping.Mappings;
@@ -159,16 +159,22 @@ public class RexUtils {
/**
* Builds index conditions.
*/
- public static Pair<List<RexNode>, List<RexNode>> buildIndexConditions(RelOptCluster cluster, RelCollation collation, RexNode condition, RelDataType rowType) {
+ public static IndexConditions buildIndexConditions(
+ RelOptCluster cluster,
+ RelCollation collation,
+ RexNode condition,
+ RelDataType rowType,
+ ImmutableBitSet requiredColumns
+ ) {
if (condition == null || collation == null || collation.getFieldCollations().isEmpty())
- return Pair.of(ImmutableList.of(), ImmutableList.of());
+ return new IndexConditions();
condition = RexUtil.toCnf(builder(cluster), condition);
Map<Integer, List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
if (F.isEmpty(fieldsToPredicates))
- return Pair.of(ImmutableList.of(), ImmutableList.of());
+ return new IndexConditions();
List<RexNode> lower = new ArrayList<>();
List<RexNode> upper = new ArrayList<>();
@@ -248,7 +254,25 @@ public class RexUtils {
}
}
- return Pair.of(lower, upper);
+ Mappings.TargetMapping mapping = null;
+
+ if (requiredColumns != null)
+ mapping = Commons.inverseMapping(requiredColumns, rowType.getFieldCount());
+
+ List<RexNode> lowerBound = null;
+ List<RexNode> upperBound = null;
+
+ if (!F.isEmpty(lower))
+ lowerBound = asBound(cluster, lower, rowType, mapping);
+ else
+ lower = null;
+
+ if (!F.isEmpty(upper))
+ upperBound = asBound(cluster, upper, rowType, mapping);
+ else
+ upper = null;
+
+ return new IndexConditions(lower, upper, lowerBound, upperBound);
}
/** */
@@ -262,7 +286,7 @@ public class RexUtils {
continue;
RexCall predCall = (RexCall)rexNode;
- RexLocalRef ref = (RexLocalRef)extractRef(predCall);
+ RexSlot ref = (RexSlot)extractRef(predCall);
if (ref == null)
continue;
@@ -288,9 +312,9 @@ public class RexUtils {
leftOp = RexUtil.removeCast(leftOp);
rightOp = RexUtil.removeCast(rightOp);
- if (leftOp instanceof RexLocalRef && idxOpSupports(rightOp))
+ if ((leftOp instanceof RexLocalRef || leftOp instanceof RexInputRef) && idxOpSupports(rightOp))
return leftOp;
- else if (rightOp instanceof RexLocalRef && idxOpSupports(leftOp))
+ else if ((rightOp instanceof RexLocalRef || rightOp instanceof RexInputRef) && idxOpSupports(leftOp))
return rightOp;
return null;
@@ -302,7 +326,7 @@ public class RexUtils {
rightOp = RexUtil.removeCast(rightOp);
- return rightOp.isA(SqlKind.LOCAL_REF);
+ return rightOp.isA(SqlKind.LOCAL_REF) || rightOp.isA(SqlKind.INPUT_REF);
}
/** */
@@ -325,6 +349,14 @@ public class RexUtils {
}
/** */
+ public static boolean isNotNull(RexNode op) {
+ if (op == null)
+ return false;
+
+ return !(op instanceof RexLiteral) || !((RexLiteral)op).isNull();
+ }
+
+ /** */
public static List<RexNode> asBound(RelOptCluster cluster, Iterable<RexNode> idxCond, RelDataType rowType, @Nullable Mappings.TargetMapping mapping) {
if (F.isEmpty(idxCond))
return null;
@@ -337,7 +369,7 @@ public class RexUtils {
assert pred instanceof RexCall;
RexCall call = (RexCall)pred;
- RexLocalRef ref = (RexLocalRef)RexUtil.removeCast(call.operands.get(0));
+ RexSlot ref = (RexSlot)RexUtil.removeCast(call.operands.get(0));
RexNode cond = RexUtil.removeCast(call.operands.get(1));
assert idxOpSupports(cond) : cond;
@@ -367,7 +399,7 @@ public class RexUtils {
}
/** */
- public static Mappings.TargetMapping invercePermutation(List<RexNode> nodes, RelDataType inputRowType, boolean local) {
+ public static Mappings.TargetMapping inversePermutation(List<RexNode> nodes, RelDataType inputRowType, boolean local) {
final Mappings.TargetMapping mapping =
Mappings.create(MappingType.INVERSE_FUNCTION, nodes.size(), inputRowType.getFieldCount());
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableSpoolTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
similarity index 91%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableSpoolTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
index ec6b926..a040acc 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableSpoolTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
@@ -19,10 +19,13 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,10 +44,10 @@ import static org.junit.Assert.assertThat;
/**
- * IndexSpool test.
+ * Index spool test.
*/
@RunWith(Parameterized.class)
-public class TableSpoolTest extends GridCommonAbstractTest {
+public class IndexSpoolIntegrationTest extends GridCommonAbstractTest {
/** Rows. */
private static final int[] ROWS = {1, 10, 512, 513, 2000};
@@ -95,7 +98,8 @@ public class TableSpoolTest extends GridCommonAbstractTest {
.setKeyFieldName("ID")
.addQueryField("ID", Integer.class.getName(), null)
.addQueryField("JID", Integer.class.getName(), null)
- .addQueryField("VAL", String.class.getName(), null);
+ .addQueryField("VAL", String.class.getName(), null)
+ .setIndexes(Collections.singletonList(new QueryIndex("JID")));
QueryEntity part1 = new QueryEntity()
.setTableName("TEST1")
@@ -104,7 +108,8 @@ public class TableSpoolTest extends GridCommonAbstractTest {
.setKeyFieldName("ID")
.addQueryField("ID", Integer.class.getName(), null)
.addQueryField("JID", Integer.class.getName(), null)
- .addQueryField("VAL", String.class.getName(), null);
+ .addQueryField("VAL", String.class.getName(), null)
+ .setIndexes(Collections.singletonList(new QueryIndex("JID")));
return super.getConfiguration(igniteInstanceName)
.setCacheConfiguration(
@@ -131,7 +136,7 @@ public class TableSpoolTest extends GridCommonAbstractTest {
List<FieldsQueryCursor<List<?>>> cursors = engine.query(
null,
"PUBLIC",
- "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter') */" +
+ "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */" +
"T0.val, T1.val FROM TEST0 as T0 " +
"JOIN TEST1 as T1 on T0.jid = T1.jid ",
X.EMPTY_OBJECT_ARRAY
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndexTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndexTest.java
new file mode 100644
index 0000000..31a1d6a
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndexTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RuntimeTreeIndexTest extends GridCommonAbstractTest {
+ /** */
+ private static final int UNIQUE_GROUPS = 10_000;
+
+ /** */
+ private static final int[] NOT_UNIQUE_ROWS_IN_GROUP = new int[] {1, 10};
+
+ /** */
+ private static final Pair<Class<?>[], ImmutableIntList>[] ROW_TYPES = new Pair[] {
+ new Pair(new Class<?>[] {int.class, int.class, int.class}, ImmutableIntList.of(1)),
+ new Pair(new Class<?>[] {int.class, long.class, int.class}, ImmutableIntList.of(1)),
+ new Pair(new Class<?>[] {int.class, String.class, int.class}, ImmutableIntList.of(1)),
+ new Pair(new Class<?>[] {int.class, Date.class, int.class}, ImmutableIntList.of(1)),
+ new Pair(new Class<?>[] {int.class, Time.class, int.class}, ImmutableIntList.of(1)),
+ new Pair(new Class<?>[] {int.class, Timestamp.class, int.class}, ImmutableIntList.of(1)),
+ new Pair(new Class<?>[] {int.class, String.class, Time.class, Date.class, Timestamp.class, int.class},
+ ImmutableIntList.of(1, 2, 3, 4))
+ };
+
+ /** Search count. */
+ private static final int SEARCH_CNT = UNIQUE_GROUPS / 100;
+
+ /** */
+ @Test
+ public void test() throws Exception {
+ IgniteTypeFactory tf = new IgniteTypeFactory();
+
+ List<Pair<RelDataType, ImmutableIntList>> testIndexes = Arrays.stream(ROW_TYPES)
+ .map(rt -> Pair.of(TypeUtils.createRowType(tf, rt.getKey()), rt.getValue()))
+ .collect(Collectors.toList());
+
+ for (Pair<RelDataType, ImmutableIntList> testIdx : testIndexes) {
+ for (int notUnique : NOT_UNIQUE_ROWS_IN_GROUP) {
+ RuntimeTreeIndex<Object[]> idx0 = generate(testIdx.getKey(), testIdx.getValue(), notUnique);
+
+ int rowIdLow = ThreadLocalRandom.current().nextInt(UNIQUE_GROUPS * notUnique);
+ int rowIdUp = rowIdLow + ThreadLocalRandom.current().nextInt(UNIQUE_GROUPS * notUnique - rowIdLow);
+
+ for (int searchNum = 0; searchNum < SEARCH_CNT; ++searchNum) {
+ Object[] lower = generateFindRow(rowIdLow, testIdx.getKey(), notUnique, testIdx.getValue());
+ Object[] upper = generateFindRow(rowIdUp, testIdx.getKey(), notUnique, testIdx.getValue());
+
+ GridCursor<Object[]> cur = idx0.find(lower, upper, null);
+
+ int rows = 0;
+ while (cur.next()) {
+ cur.get();
+
+ rows++;
+ }
+
+ assertEquals("Invalid results [rowType=" + testIdx.getKey() + ", notUnique=" + notUnique +
+ ", rowIdLow=" + rowIdLow + ", rowIdUp=" + rowIdUp,
+ (rowIdUp / notUnique - rowIdLow / notUnique + 1) * notUnique,
+ rows);
+ }
+ }
+ }
+ }
+
+ /** */
+ private RuntimeTreeIndex<Object[]> generate(RelDataType rowType, final List<Integer> idxCols, int notUnique) {
+ RuntimeTreeIndex<Object[]> idx = new RuntimeTreeIndex<>(
+ new ExecutionContext<>(
+ null,
+ PlanningContext.builder()
+ .logger(log())
+ .build(),
+ null,
+ null,
+ ArrayRowHandler.INSTANCE,
+ null),
+ RelCollations.of(ImmutableIntList.copyOf(idxCols)),
+ (o1, o2) -> {
+ for (int colIdx : idxCols) {
+ int res = ((Comparable)o1[colIdx]).compareTo(o2[colIdx]);
+
+ if (res != 0)
+ return res;
+ }
+
+ return 0;
+ });
+
+ BitSet rowIds = new BitSet(UNIQUE_GROUPS);
+
+ // First random fill
+ for (int i = 0; i < UNIQUE_GROUPS * notUnique; ++i) {
+ int rowId = ThreadLocalRandom.current().nextInt(UNIQUE_GROUPS);
+
+ if (!rowIds.get(rowId)) {
+ idx.push(generateRow(rowId, rowType, notUnique));
+ rowIds.set(rowId);
+ }
+ }
+
+ for (int i = 0; i < UNIQUE_GROUPS * notUnique; ++i) {
+ if (!rowIds.get(i))
+ idx.push(generateRow(i, rowType, notUnique));
+ }
+
+ return idx;
+ }
+
+ /** */
+ private Object[] generateRow(int rowId, RelDataType rowType, int notUnique) {
+ Object[] row = new Object[rowType.getFieldCount()];
+
+ for (int i = 0; i < rowType.getFieldCount(); ++i)
+ row[i] = generateValue(rowId, rowType.getFieldList().get(i), notUnique);
+
+ return row;
+ }
+
+ /** */
+ private Object[] generateFindRow(int rowId, RelDataType rowType, int notUnique, final List<Integer> idxCols) {
+ Object[] row = generateRow(rowId, rowType, notUnique);
+
+ for (int i = 0; i < rowType.getFieldCount(); ++i) {
+ if (!idxCols.contains(i))
+ row[i] = null;
+ }
+
+ return row;
+ }
+
+ /** */
+ private Object generateValue(int rowId, RelDataTypeField field, int notUnique) {
+ long mod = rowId / notUnique;
+ long baseDate = 1_000_000L;
+
+ switch (field.getType().getSqlTypeName().getFamily()) {
+ case NUMERIC:
+ return mod;
+
+ case CHARACTER:
+ return "val " + String.format("%07d", mod);
+
+ case DATE:
+ return new Date(baseDate + mod * 10_000);
+
+ case TIME:
+ return new Time(mod);
+
+ case TIMESTAMP:
+ return new Timestamp(baseDate + mod);
+
+ default:
+ assert false : "Not supported type for test: " + field.getType();
+ return null;
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 9523d6a..02bc083 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -28,11 +29,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -52,6 +56,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
@@ -317,4 +322,99 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
// No-op;
}
}
+
+ /**
+ *
+ */
+ public static class TestTable implements Iterable<Object[]> {
+ /** */
+ private int rowsCnt;
+
+ /** */
+ private RelDataType rowType;
+
+ /** */
+ private Function<Integer, Object>[] fieldCreators;
+
+ /** */
+ TestTable(int rowsCnt, RelDataType rowType) {
+ this.rowsCnt = rowsCnt;
+ this.rowType = rowType;
+
+ fieldCreators = rowType.getFieldList().stream()
+ .map((Function<RelDataTypeField, Function<Integer, Object>>) (t) -> {
+ switch (t.getType().getSqlTypeName().getFamily()) {
+ case NUMERIC:
+ return TestTable::intField;
+
+ case CHARACTER:
+ return TestTable::stringField;
+
+ default:
+ assert false : "Not supported type for test: " + t;
+ return null;
+ }
+ })
+ .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()]);
+ }
+
+ /** */
+ private static Object stringField(Integer rowNum) {
+ return "val_" + rowNum;
+ }
+
+ /** */
+ private static Object intField(Integer rowNum) {
+ return rowNum;
+ }
+
+ /** */
+ private Object[] createRow(int rowNum) {
+ Object[] row = new Object[rowType.getFieldCount()];
+
+ for (int i = 0; i < fieldCreators.length; ++i)
+ row[i] = fieldCreators[i].apply(rowNum);
+
+ return row;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<Object[]> iterator() {
+ return new Iterator<Object[]>() {
+ private int curRow;
+
+ @Override public boolean hasNext() {
+ return curRow < rowsCnt;
+ }
+
+ @Override public Object[] next() {
+ return createRow(curRow++);
+ }
+ };
+ }
+ }
+
+ /** */
+ public static class RootRewindable<Row> extends RootNode<Row> {
+ /** */
+ public RootRewindable(ExecutionContext<Row> ctx, RelDataType rowType) {
+ super(ctx, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void rewindInternal() {
+ GridTestUtils.setFieldValue(this, RootNode.class, "waiting", 0);
+ GridTestUtils.setFieldValue(this, RootNode.class, "closed", false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void closeInternal() {
+ // No-op
+ }
+
+ /** */
+ public void closeRewindableRoot() {
+ super.closeInternal();
+ }
+ }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 71d2249..729b79b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -19,12 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import java.util.function.Function;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -32,7 +29,6 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
@@ -49,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -1054,74 +1049,6 @@ public class ExecutionTest extends AbstractExecutionTest {
/**
*
*/
- @Test
- public void testTableSpool() {
- ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
- IgniteTypeFactory tf = ctx.getTypeFactory();
- RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
-
- int[] leftSizes = {1, 99, 100, 101, 512, 513, 2000};
- int[] rightSizes = {1, 99, 100, 101, 512, 513, 2000};
- int[] rightBufSizes = {1, 100, 512};
-
- for (int rightBufSize : rightBufSizes) {
- for (int leftSize : leftSizes) {
- for (int rightSize : rightSizes) {
- log.info("Check: rightBufSize=" + rightBufSize + ", leftSize=" + leftSize + ", rightSize=" + rightSize);
-
- ScanNode<Object[]> left = new ScanNode<>(ctx, rowType, new TestTable(leftSize, rowType));
- ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, new TestTable(rightSize, rowType) {
- boolean first = true;
-
- @Override public @NotNull Iterator<Object[]> iterator() {
- assertTrue("Rewind right", first);
-
- first = false;
- return super.iterator();
- }
- });
-
- TableSpoolNode<Object[]> rightSpool = new TableSpoolNode<>(ctx, rowType);
-
- rightSpool.register(Arrays.asList(right));
-
- RelDataType joinRowType = TypeUtils.createRowType(
- tf,
- int.class, String.class, int.class,
- int.class, String.class, int.class);
-
- CorrelatedNestedLoopJoinNode<Object[]> join = new CorrelatedNestedLoopJoinNode<>(
- ctx,
- joinRowType,
- r -> r[0].equals(r[3]),
- ImmutableSet.of(new CorrelationId(0)));
-
- GridTestUtils.setFieldValue(join, "rightInBufferSize", rightBufSize);
-
- join.register(Arrays.asList(left, rightSpool));
-
- RootNode<Object[]> root = new RootNode<>(ctx, joinRowType);
- root.register(join);
-
- int cnt = 0;
- while (root.hasNext()) {
- root.next();
-
- cnt++;
- }
-
- assertEquals(
- "Invalid result size. [left=" + leftSize + ", right=" + rightSize + ", results=" + cnt,
- min(leftSize, rightSize),
- cnt);
- }
- }
- }
- }
-
- /**
- *
- */
private Object[] row(Object... fields) {
return fields;
}
@@ -1155,75 +1082,4 @@ public class ExecutionTest extends AbstractExecutionTest {
}
};
}
-
- /**
- *
- */
- public static class TestTable implements Iterable<Object[]> {
- /** */
- private int rowsCnt;
-
- /** */
- private RelDataType rowType;
-
- /** */
- private Function<Integer, Object>[] fieldCreators;
-
- /** */
- TestTable(int rowsCnt, RelDataType rowType) {
- this.rowsCnt = rowsCnt;
- this.rowType = rowType;
-
- fieldCreators = rowType.getFieldList().stream()
- .map((Function<RelDataTypeField, Function<Integer, Object>>) (t) -> {
- switch (t.getType().getSqlTypeName().getFamily()) {
- case NUMERIC:
- return TestTable::intField;
-
- case CHARACTER:
- return TestTable::stringField;
-
- default:
- assert false : "Not supported type for test: " + t;
- return null;
- }
- })
- .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()]);
- }
-
- /** */
- private static Object stringField(Integer rowNum) {
- return "val_" + rowNum;
- }
-
- /** */
- private static Object intField(Integer rowNum) {
- return rowNum;
- }
-
- /** */
- private Object[] createRow(int rowNum) {
- Object[] row = new Object[rowType.getFieldCount()];
-
- for (int i = 0; i < fieldCreators.length; ++i)
- row[i] = fieldCreators[i].apply(rowNum);
-
- return row;
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<Object[]> iterator() {
- return new Iterator<Object[]>() {
- private int curRow;
-
- @Override public boolean hasNext() {
- return curRow < rowsCnt;
- }
-
- @Override public Object[] next() {
- return createRow(curRow++);
- }
- };
- }
- }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolExecutionTest.java
new file mode 100644
index 0000000..e0b0806
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolExecutionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.lang.GridTuple4;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class IndexSpoolExecutionTest extends AbstractExecutionTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Before
+ @Override public void setup() throws Exception {
+ nodesCnt = 1;
+ super.setup();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testIndexSpool() throws Exception {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
+
+ int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ int[] sizes = {1, inBufSize / 2 - 1, inBufSize / 2, inBufSize / 2 + 1, inBufSize, inBufSize + 1, inBufSize * 4};
+
+ for (int size : sizes) {
+ // (filter, lower, upper, expected result size)
+ GridTuple4<Predicate<Object[]>, Object[], Object[], Integer>[] testBounds;
+
+ if (size == 1) {
+ testBounds = new GridTuple4[] {
+ new GridTuple4(null, new Object[] {null, null, null}, new Object[] {null, null, null}, 1),
+ new GridTuple4(null, new Object[] {0, null, null}, new Object[] {0, null, null}, 1)
+ };
+ }
+ else {
+ testBounds = new GridTuple4[] {
+ new GridTuple4(
+ null,
+ new Object[] {null, null, null},
+ new Object[] {null, null, null},
+ size
+ ),
+ new GridTuple4(
+ null,
+ new Object[] {size / 2, null, null},
+ new Object[] {size / 2, null, null},
+ 1
+ ),
+ new GridTuple4(
+ null,
+ new Object[] {size / 2 + 1, null, null},
+ new Object[] {size / 2 + 1, null, null},
+ 1
+ ),
+ new GridTuple4(
+ null,
+ new Object[] {size / 2, null, null},
+ new Object[] {null, null, null},
+ size - size / 2
+ ),
+ new GridTuple4(
+ null,
+ new Object[] {null, null, null},
+ new Object[] {size / 2, null, null},
+ size / 2 + 1
+ ),
+ new GridTuple4<>(
+ (Predicate<Object[]>)(r -> ((int)r[0]) < size / 2),
+ new Object[] {null, null, null},
+ new Object[] {size / 2, null, null},
+ size / 2
+ ),
+ };
+ }
+
+ log.info("Check: size=" + size);
+
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, new TestTable(size, rowType) {
+ boolean first = true;
+
+ @Override public @NotNull Iterator<Object[]> iterator() {
+ assertTrue("Rewind right", first);
+
+ first = false;
+ return super.iterator();
+ }
+ });
+
+ Object[] lower = new Object[3];
+ Object[] upper = new Object[3];
+ TestPredicate testFilter = new TestPredicate();
+
+ IndexSpoolNode<Object[]> spool = new IndexSpoolNode<>(
+ ctx,
+ rowType,
+ RelCollations.of(ImmutableIntList.of(0)),
+ (o1, o2) -> o1[0] != null ? ((Comparable)o1[0]).compareTo(o2[0]) : 0,
+ testFilter,
+ () -> lower,
+ () -> upper
+ );
+
+ spool.register(Arrays.asList(scan));
+
+ RootRewindable<Object[]> root = new RootRewindable<>(ctx, rowType);
+ root.register(spool);
+
+ for (GridTuple4<Predicate<Object[]>, Object[], Object[], Integer> bound : testBounds) {
+ log.info("Check: bound=" + bound);
+
+ // Set up bounds
+ testFilter.delegate = bound.get1();
+ System.arraycopy(bound.get2(), 0, lower, 0, lower.length);
+ System.arraycopy(bound.get3(), 0, upper, 0, upper.length);
+
+ int cnt = 0;
+
+ while (root.hasNext()) {
+ root.next();
+
+ cnt++;
+ }
+
+ assertEquals(
+ "Invalid result size",
+ (int)bound.get4(),
+ cnt);
+
+ root.rewind();
+ }
+
+ root.closeRewindableRoot();
+ }
+ }
+
+ /** */
+ static class TestPredicate implements Predicate<Object[]> {
+ /** */
+ Predicate<Object[]> delegate;
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Object[] objects) {
+ if (delegate == null)
+ return true;
+ else
+ return delegate.test(objects);
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
new file mode 100644
index 0000000..6a08401
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class TableSpoolExecutionTest extends AbstractExecutionTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Before
+ @Override public void setup() throws Exception {
+ nodesCnt = 1;
+ super.setup();
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testTableSpool() throws Exception {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
+
+ int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ int[] sizes = {1, inBufSize / 2 - 1, inBufSize / 2, inBufSize / 2 + 1, inBufSize, inBufSize + 1, inBufSize * 4};
+// int[] sizes = {inBufSize * 4};
+ int rewindCnts = 32;
+
+ for (int size : sizes) {
+ log.info("Check: size=" + size);
+
+ ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, new TestTable(size, rowType) {
+ boolean first = true;
+
+ @Override public @NotNull Iterator<Object[]> iterator() {
+ assertTrue("Rewind table", first);
+
+ first = false;
+ return super.iterator();
+ }
+ });
+
+ TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType);
+
+ spool.register(Arrays.asList(right));
+
+ RootRewindable<Object[]> root = new RootRewindable<>(ctx, rowType);
+ root.register(spool);
+
+ for (int i = 0; i < rewindCnts; ++i) {
+ int cnt = 0;
+
+ while (root.hasNext()) {
+ root.next();
+
+ cnt++;
+ }
+
+ assertEquals(
+ "Invalid result size",
+ size,
+ cnt);
+
+ root.rewind();
+ }
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
new file mode 100644
index 0000000..6e5b112
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
+import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
+import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
+import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
+
+/**
+ *
+ */
+//@WithSystemProperty(key = "calcite.debug", value = "true")
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened", "unchecked"})
+public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
+ /** */
+ protected List<UUID> nodes;
+
+ /** */
+ protected List<QueryTaskExecutorImpl> executors;
+
+ /** */
+ protected volatile Throwable lastE;
+
+ /** */
+ @Before
+ public void setup() {
+ nodes = new ArrayList<>(4);
+
+ for (int i = 0; i < 4; i++)
+ nodes.add(UUID.randomUUID());
+ }
+
+ /** */
+ @After
+ public void tearDown() throws Throwable {
+ if (!F.isEmpty(executors))
+ executors.forEach(QueryTaskExecutorImpl::tearDown);
+
+ if (lastE != null)
+ throw lastE;
+ }
+
+ /** */
+ interface TestVisitor {
+ public void visit(RelNode node, int ordinal, RelNode parent);
+ }
+
+ /** */
+ public static class TestRelVisitor extends RelVisitor {
+ /** */
+ final TestVisitor v;
+
+ /** */
+ TestRelVisitor(TestVisitor v) {
+ this.v = v;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visit(RelNode node, int ordinal, RelNode parent) {
+ v.visit(node, ordinal, parent);
+
+ super.visit(node, ordinal, parent);
+ }
+ }
+
+ /** */
+ protected static void relTreeVisit(RelNode n, TestVisitor v) {
+ v.visit(n, -1, null);
+
+ n.childrenAccept(new TestRelVisitor(v));
+ }
+
+ /** */
+ public static <T extends RelNode> T findFirstNode(RelNode plan, Predicate<RelNode> pred) {
+ return F.first(findNodes(plan, pred));
+ }
+
+ /** */
+ public static <T extends RelNode> List<T> findNodes(RelNode plan, Predicate<RelNode> pred) {
+ List<T> ret = new ArrayList<>();
+
+ plan.childrenAccept(
+ new RelVisitor() {
+ @Override public void visit(RelNode node, int ordinal, RelNode parent) {
+ if (pred.test(node))
+ ret.add((T)node);
+
+ super.visit(node, ordinal, parent);
+ }
+ }
+ );
+
+ return ret;
+ }
+
+ /** */
+ public static <T extends RelNode> Predicate<RelNode> byClass(Class<T> cls) {
+ return node -> cls.isInstance(node);
+ }
+
+ /** */
+ public static <T extends RelNode> Predicate<RelNode> byClass(Class<T> cls, Predicate<RelNode> pred) {
+ return node -> cls.isInstance(node) && pred.test(node);
+ }
+
+ /** */
+ protected IgniteRel physicalPlan(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(F.first(nodes))
+ .originatingNodeId(F.first(nodes))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .logger(log)
+ .query(sql)
+ .topologyVersion(AffinityTopologyVersion.NONE)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ assertNotNull(rel);
+
+ // Transformation chain
+ RelTraitSet desired = rel.getTraitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .replace(CorrelationTrait.UNCORRELATED)
+ .replace(RewindabilityTrait.ONE_WAY)
+ .simplify();
+
+ planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
+
+ try {
+ IgniteRel res = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ return res;
+ }
+ catch (Throwable ex) {
+ System.err.println(planner.dump());
+
+ throw ex;
+ }
+ }
+ }
+
+ /** */
+ protected RelNode originalLogicalTree(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(F.first(nodes))
+ .originatingNodeId(F.first(nodes))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .logger(log)
+ .query(sql)
+ .topologyVersion(AffinityTopologyVersion.NONE)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ assertNotNull(rel);
+
+ return rel;
+ }
+ }
+
+ /** */
+ protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSchema) {
+ rel = Cloner.clone(rel);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ List<Fragment> fragments = new Splitter().go(rel);
+ List<String> serialized = new ArrayList<>(fragments.size());
+
+ for (Fragment fragment : fragments)
+ serialized.add(toJson(fragment.root()));
+
+ assertNotNull(serialized);
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(F.first(nodes))
+ .originatingNodeId(F.first(nodes))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .logger(log)
+ .topologyVersion(AffinityTopologyVersion.NONE)
+ .build();
+
+ List<RelNode> deserializedNodes = new ArrayList<>();
+
+ try (IgnitePlanner ignored = ctx.planner()) {
+ for (String s : serialized) {
+ RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+ deserializedNodes.add(reader.read(s));
+ }
+ }
+
+ List<RelNode> expectedRels = fragments.stream()
+ .map(f -> f.root())
+ .collect(Collectors.toList());
+
+ assertEquals("Invalid deserialization fragments count", expectedRels.size(), deserializedNodes.size());
+
+ for (int i = 0; i < expectedRels.size(); ++i) {
+ RelNode expected = expectedRels.get(i);
+ RelNode deserialized = deserializedNodes.get(i);
+
+ clearTraits(expected);
+ clearTraits(deserialized);
+
+ if (!expected.deepEquals(deserialized))
+ assertTrue(
+ "Invalid serialization / deserialization.\n" +
+ "Expected:\n" + RelOptUtil.toString(expected) +
+ "Deserialized:\n" + RelOptUtil.toString(deserialized),
+ expected.deepEquals(deserialized)
+ );
+ }
+ }
+
+ /** */
+ protected void clearTraits(RelNode rel) {
+ GridTestUtils.setFieldValue(rel, AbstractRelNode.class, "traitSet", RelTraitSet.createEmpty());
+ rel.getInputs().forEach(this::clearTraits);
+ }
+
+ /** */
+ protected List<UUID> intermediateMapping(@NotNull AffinityTopologyVersion topVer, boolean single, @Nullable Predicate<ClusterNode> filter) {
+ return single ? select(nodes, 0) : select(nodes, 0, 1, 2, 3);
+ }
+
+ /** */
+ public static <T> List<T> select(List<T> src, int... idxs) {
+ ArrayList<T> res = new ArrayList<>(idxs.length);
+
+ for (int idx : idxs)
+ res.add(src.get(idx));
+
+ return res;
+ }
+
+ /** */
+ protected <Row> Row row(ExecutionContext<Row> ctx, ImmutableBitSet requiredColumns, Object... fields) {
+ Type[] types = new Type[fields.length];
+ for (int i = 0; i < fields.length; i++)
+ types[i] = fields[i] == null ? Object.class : fields[i].getClass();
+
+ if (requiredColumns == null) {
+ for (int i = 0; i < fields.length; i++)
+ types[i] = fields[i] == null ? Object.class : fields[i].getClass();
+ }
+ else {
+ for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++)
+ types[i] = fields[i] == null ? Object.class : fields[i].getClass();
+ }
+
+ return ctx.rowHandler().factory(types).create(fields);
+ }
+
+ /** */
+ abstract static class TestTable implements IgniteTable {
+ /** */
+ private final RelProtoDataType protoType;
+
+ /** */
+ private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+ /** */
+ private final RewindabilityTrait rewindable;
+
+ /** */
+ private final double rowCnt;
+
+ /** */
+ TestTable(RelDataType type) {
+ this(type, RewindabilityTrait.REWINDABLE);
+ }
+
+ /** */
+ TestTable(RelDataType type, RewindabilityTrait rewindable) {
+ this(type, rewindable, 100.0);
+ }
+
+ /** */
+ TestTable(RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
+ protoType = RelDataTypeImpl.proto(type);
+ this.rewindable = rewindable;
+ this.rowCnt = rowCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogicalTableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
+ RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
+ .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
+ .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
+
+ return IgniteLogicalTableScan.create(cluster, traitSet, relOptTbl, null, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName) {
+ RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
+ .replaceIf(DistributionTraitDef.INSTANCE, this::distribution)
+ .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
+ .replaceIf(RelCollationTraitDef.INSTANCE, getIndex(idxName)::collation);
+
+ return IgniteLogicalIndexScan.create(cluster, traitSet, relOptTbl, idxName, null, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
+ RelDataType rowType = protoType.apply(typeFactory);
+
+ if (bitSet != null) {
+ RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
+ for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
+ b.add(rowType.getFieldList().get(i));
+ rowType = b.build();
+ }
+
+ return rowType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Statistic getStatistic() {
+ return new Statistic() {
+ /** {@inheritDoc */
+ @Override public Double getRowCount() {
+ return rowCnt;
+ }
+
+ /** {@inheritDoc */
+ @Override public boolean isKey(ImmutableBitSet cols) {
+ return false;
+ }
+
+ /** {@inheritDoc */
+ @Override public List<ImmutableBitSet> getKeys() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc */
+ @Override public List<RelReferentialConstraint> getReferentialConstraints() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc */
+ @Override public List<RelCollation> getCollations() {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc */
+ @Override public RelDistribution getDistribution() {
+ throw new AssertionError();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ CollocationGroup group, Predicate<Row> filter,
+ Function<Row, Row> transformer,
+ ImmutableBitSet bitSet
+ ) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Schema.TableType getJdbcTableType() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRolledUp(String col) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rolledUpColumnValidInsideAgg(
+ String column,
+ SqlCall call,
+ SqlNode parent,
+ CalciteConnectionConfig config
+ ) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CollocationGroup colocationGroup(PlanningContext ctx) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteDistribution distribution() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TableDescriptor descriptor() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, IgniteIndex> indexes() {
+ return Collections.unmodifiableMap(indexes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addIndex(IgniteIndex idxTbl) {
+ indexes.put(idxTbl.name(), idxTbl);
+ }
+
+ /** */
+ public TestTable addIndex(RelCollation collation, String name) {
+ indexes.put(name, new IgniteIndex(collation, name, null, this));
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteIndex getIndex(String idxName) {
+ return indexes.get(idxName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeIndex(String idxName) {
+ throw new AssertionError();
+ }
+ }
+
+ /** */
+ static class TestMessageServiceImpl extends MessageServiceImpl {
+ /** */
+ private final TestIoManager mgr;
+
+ /** */
+ TestMessageServiceImpl(GridTestKernalContext kernal, TestIoManager mgr) {
+ super(kernal);
+ this.mgr = mgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void send(UUID nodeId, CalciteMessage msg) {
+ mgr.send(localNodeId(), nodeId, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean alive(UUID nodeId) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void prepareMarshal(Message msg) {
+ // No-op;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void prepareUnmarshal(Message msg) {
+ // No-op;
+ }
+ }
+
+ /** */
+ class TestFailureProcessor extends FailureProcessor {
+ /** */
+ TestFailureProcessor(GridTestKernalContext kernal) {
+ super(kernal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean process(FailureContext failureCtx) {
+ Throwable ex = failureContext().error();
+ log().error(ex.getMessage(), ex);
+
+ lastE = ex;
+
+ return true;
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java
new file mode 100644
index 0000000..6c1e3bf
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
+ /**
+ * Check equi-join. CorrelatedNestedLoopJoinTest is applicable for it.
+ */
+ @Test
+ public void testValidIndexExpressions() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "T0",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ );
+
+ publicSchema.addTable(
+ "T1",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid_idx")
+ );
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid = t1.jid";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule"
+ );
+
+ assertNotNull(phys);
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteIndexScan idxScan = findFirstNode(phys, byClass(IgniteIndexScan.class));
+
+ List<RexNode> lBound = idxScan.lowerBound();
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), lBound);
+ assertEquals(3, lBound.size());
+
+ assertTrue(((RexLiteral)lBound.get(0)).isNull());
+ assertTrue(((RexLiteral)lBound.get(2)).isNull());
+ assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+ List<RexNode> uBound = idxScan.upperBound();
+
+ assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), uBound);
+ assertEquals(3, uBound.size());
+
+ assertTrue(((RexLiteral)uBound.get(0)).isNull());
+ assertTrue(((RexLiteral)uBound.get(2)).isNull());
+ assertTrue(uBound.get(1) instanceof RexFieldAccess);
+ }
+
+ /**
+ * Check join with not equi condition.
+ * Current implementation of the CorrelatedNestedLoopJoinTest is not applicable for such case.
+ */
+ @Test
+ public void testInvalidIndexExpressions() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "T0",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t0_jid_idx")
+ );
+
+ publicSchema.addTable(
+ "T1",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid_idx")
+ );
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid + 2 > t1.jid * 2";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule"
+ );
+
+ assertNotNull(phys);
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexSpoolPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexSpoolPlannerTest.java
new file mode 100644
index 0000000..d9eaae6
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexSpoolPlannerTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings({"FieldCanBeLocal"})
+public class IndexSpoolPlannerTest extends AbstractPlannerTest {
+ /**
+ * Check equi-join on not collocated fields.
+ * CorrelatedNestedLoopJoinTest is applicable for this case only with IndexSpool.
+ */
+ @Test
+ public void test() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "T0",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T0", "hash");
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t0_jid_idx")
+ );
+
+ publicSchema.addTable(
+ "T1",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T1", "hash");
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid_idx")
+ );
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid = t1.jid";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter"
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteIndexSpool.class));
+
+ List<RexNode> lBound = idxSpool.indexCondition().lowerBound();
+
+ assertNotNull(lBound);
+ assertEquals(3, lBound.size());
+
+ assertTrue(((RexLiteral)lBound.get(0)).isNull());
+ assertTrue(((RexLiteral)lBound.get(2)).isNull());
+ assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+ List<RexNode> uBound = idxSpool.indexCondition().upperBound();
+
+ assertNotNull(uBound);
+ assertEquals(3, uBound.size());
+
+ assertTrue(((RexLiteral)uBound.get(0)).isNull());
+ assertTrue(((RexLiteral)uBound.get(2)).isNull());
+ assertTrue(uBound.get(1) instanceof RexFieldAccess);
+ }
+
+ /**
+ * Check case when exists index (collation) isn't applied not for whole join condition
+ * but may be used by part of condition.
+ */
+ @Test
+ public void testPartialIndexForCondition() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "T0",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID0", f.createJavaType(Integer.class))
+ .add("JID1", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T0", "hash");
+ }
+ }
+ );
+
+ publicSchema.addTable(
+ "T1",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID0", f.createJavaType(Integer.class))
+ .add("JID1", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T1", "hash");
+ }
+ }
+ .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid0_idx")
+ );
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid0 = t1.jid0 and t0.jid1 = t1.jid1";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter"
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteIndexSpool.class));
+
+ List<RexNode> lBound = idxSpool.indexCondition().lowerBound();
+
+ assertNotNull(lBound);
+ assertEquals(4, lBound.size());
+
+ assertTrue(((RexLiteral)lBound.get(0)).isNull());
+ assertTrue(((RexLiteral)lBound.get(2)).isNull());
+ assertTrue(((RexLiteral)lBound.get(3)).isNull());
+ assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+ List<RexNode> uBound = idxSpool.indexCondition().upperBound();
+
+ assertNotNull(uBound);
+ assertEquals(4, uBound.size());
+
+ assertTrue(((RexLiteral)uBound.get(0)).isNull());
+ assertTrue(((RexLiteral)lBound.get(2)).isNull());
+ assertTrue(((RexLiteral)lBound.get(3)).isNull());
+ assertTrue(uBound.get(1) instanceof RexFieldAccess);
+ }
+
+ /**
+ * Check equi-join on not collocated fields without indexes.
+ */
+ @Test
+ public void testSourceWithoutCollation() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "T0",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T0", "hash");
+ }
+ }
+ );
+
+ publicSchema.addTable(
+ "T1",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T1", "hash");
+ }
+ }
+ );
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid = t1.jid";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter"
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+
+ IgniteIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteIndexSpool.class));
+
+ assertTrue(idxSpool.getInput() instanceof IgniteSort);
+
+ List<RexNode> lBound = idxSpool.indexCondition().lowerBound();
+
+ assertNotNull(lBound);
+ assertEquals(3, lBound.size());
+
+ assertTrue(((RexLiteral)lBound.get(0)).isNull());
+ assertTrue(((RexLiteral)lBound.get(2)).isNull());
+ assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+ List<RexNode> uBound = idxSpool.indexCondition().upperBound();
+
+ assertNotNull(uBound);
+ assertEquals(3, uBound.size());
+
+ assertTrue(((RexLiteral)uBound.get(0)).isNull());
+ assertTrue(((RexLiteral)uBound.get(2)).isNull());
+ assertTrue(uBound.get(1) instanceof RexFieldAccess);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
similarity index 84%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7f8a4b8..740af2b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -15,15 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.planner;
-import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,39 +29,23 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelReferentialConstraint;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeImpl;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -76,7 +56,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
-import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationGroup;
@@ -95,13 +74,8 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
@@ -113,15 +87,10 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
@@ -135,35 +104,7 @@ import static org.apache.ignite.internal.processors.query.calcite.externalize.Re
*/
//@WithSystemProperty(key = "calcite.debug", value = "true")
@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
-public class PlannerTest extends GridCommonAbstractTest {
- /** */
- private List<UUID> nodes;
-
- /** */
- private List<QueryTaskExecutorImpl> executors;
-
- /** */
- private volatile Throwable lastE;
-
- /** */
- @Before
- public void setup() {
- nodes = new ArrayList<>(4);
-
- for (int i = 0; i < 4; i++)
- nodes.add(UUID.randomUUID());
- }
-
- /** */
- @After
- public void tearDown() throws Throwable {
- if (!F.isEmpty(executors))
- executors.forEach(QueryTaskExecutorImpl::tearDown);
-
- if (lastE != null)
- throw lastE;
- }
-
+public class PlannerTest extends AbstractPlannerTest {
/**
* @throws Exception If failed.
*/
@@ -797,11 +738,11 @@ public class PlannerTest extends GridCommonAbstractTest {
Supplier<Row> lowerIdxConditions,
Supplier<Row> upperIdxConditions,
Function<Row, Row> rowTransformer,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) {
return Linq4j.asEnumerable(Arrays.asList(
- row(execCtx, requiredColunms, 0, "Igor", 0),
- row(execCtx, requiredColunms, 1, "Roman", 0)
+ row(execCtx, requiredColumns, 0, "Igor", 0),
+ row(execCtx, requiredColumns, 1, "Roman", 0)
));
}
};
@@ -837,11 +778,11 @@ public class PlannerTest extends GridCommonAbstractTest {
Supplier<Row> lowerIdxConditions,
Supplier<Row> upperIdxConditions,
Function<Row, Row> rowTransformer,
- @Nullable ImmutableBitSet requiredColunms
+ @Nullable ImmutableBitSet requiredColumns
) {
return Linq4j.asEnumerable(Arrays.asList(
- row(execCtx, requiredColunms, 0, "Calcite", 1),
- row(execCtx, requiredColunms, 1, "Ignite", 1)
+ row(execCtx, requiredColumns, 0, "Calcite", 1),
+ row(execCtx, requiredColumns, 1, "Ignite", 1)
));
}
};
@@ -2649,7 +2590,7 @@ public class PlannerTest extends GridCommonAbstractTest {
.replace(CorrelationTrait.UNCORRELATED)
.simplify();
- RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+ IgniteRel phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
assertNotNull(phys);
assertEquals(
@@ -2658,6 +2599,8 @@ public class PlannerTest extends GridCommonAbstractTest {
" IgniteTableScan(table=[[PUBLIC, DEPT]], requiredColumns=[{0}])\n" +
" IgniteTableScan(table=[[PUBLIC, EMP]], filters=[=(CAST(+($cor1.DEPTNO, $t0)):INTEGER, 2)], requiredColumns=[{2}])\n",
RelOptUtil.toString(phys));
+
+ checkSplitAndSerialization(phys, publicSchema);
}
}
@@ -2703,7 +2646,7 @@ public class PlannerTest extends GridCommonAbstractTest {
String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name = d.name order by e.name, d.deptno";
- RelNode phys = physicalPlan(sql, publicSchema);
+ IgniteRel phys = physicalPlan(sql, publicSchema);
assertNotNull(phys);
assertEquals("" +
@@ -2711,6 +2654,8 @@ public class PlannerTest extends GridCommonAbstractTest {
" IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n" +
" IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n",
RelOptUtil.toString(phys));
+
+ checkSplitAndSerialization(phys, publicSchema);
}
/** */
@@ -2768,128 +2713,6 @@ public class PlannerTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
- public void tableSpoolDistributed() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable t0 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("JID", f.createJavaType(Integer.class))
- .add("VAL", f.createJavaType(String.class))
- .build()) {
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "T0", "hash");
- }
- };
-
- TestTable t1 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("JID", f.createJavaType(Integer.class))
- .add("VAL", f.createJavaType(String.class))
- .build()) {
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.affinity(0, "T1", "hash");
- }
- };
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("T0", t0);
- publicSchema.addTable("T1", t1);
-
- String sql = "select * " +
- "from t0 " +
- "join t1 on t0.jid = t1.jid";
-
- RelNode phys = physicalPlan(sql, publicSchema, "NestedLoopJoinConverter", "MergeJoinConverter");
-
- assertNotNull(phys);
-
- AtomicInteger spoolCnt = new AtomicInteger();
-
- phys.childrenAccept(
- new RelVisitor() {
- @Override public void visit(RelNode node, int ordinal, RelNode parent) {
- if (node instanceof IgniteTableSpool)
- spoolCnt.incrementAndGet();
-
- super.visit(node, ordinal, parent);
- }
- }
- );
-
- assertEquals(1, spoolCnt.get());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void tableSpoolBroadcastNotRewindable() throws Exception {
- IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- TestTable t0 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("JID", f.createJavaType(Integer.class))
- .add("VAL", f.createJavaType(String.class))
- .build(),
- RewindabilityTrait.ONE_WAY) {
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.broadcast();
- }
- };
-
- TestTable t1 = new TestTable(
- new RelDataTypeFactory.Builder(f)
- .add("ID", f.createJavaType(Integer.class))
- .add("JID", f.createJavaType(Integer.class))
- .add("VAL", f.createJavaType(String.class))
- .build(),
- RewindabilityTrait.ONE_WAY) {
-
- @Override public IgniteDistribution distribution() {
- return IgniteDistributions.broadcast();
- }
- };
-
- IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
- publicSchema.addTable("T0", t0);
- publicSchema.addTable("T1", t1);
-
- String sql = "select * " +
- "from t0 " +
- "join t1 on t0.jid = t1.jid";
-
- RelNode phys = physicalPlan(sql, publicSchema, "NestedLoopJoinConverter", "MergeJoinConverter");
-
- assertNotNull(phys);
-
- AtomicInteger spoolCnt = new AtomicInteger();
-
- phys.childrenAccept(
- new RelVisitor() {
- @Override public void visit(RelNode node, int ordinal, RelNode parent) {
- if (node instanceof IgniteTableSpool)
- spoolCnt.incrementAndGet();
-
- super.visit(node, ordinal, parent);
- }
- }
- );
-
- assertEquals(1, spoolCnt.get());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
public void testLimit() throws Exception {
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
@@ -2910,7 +2733,7 @@ public class PlannerTest extends GridCommonAbstractTest {
String sql = "SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
{
- RelNode phys = physicalPlan(sql, publicSchema);
+ IgniteRel phys = physicalPlan(sql, publicSchema);
assertNotNull(phys);
@@ -2928,12 +2751,14 @@ public class PlannerTest extends GridCommonAbstractTest {
assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1, limit.get());
assertFalse("Invalid plan: \n" + RelOptUtil.toString(phys), sort.get());
+
+ checkSplitAndSerialization(phys, publicSchema);
}
sql = "SELECT * FROM TEST ORDER BY ID OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
{
- RelNode phys = physicalPlan(sql, publicSchema);
+ IgniteRel phys = physicalPlan(sql, publicSchema);
assertNotNull(phys);
@@ -2951,344 +2776,8 @@ public class PlannerTest extends GridCommonAbstractTest {
assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1, limit.get());
assertTrue("Invalid plan: \n" + RelOptUtil.toString(phys), sort.get());
- }
- }
-
- /** */
- interface TestVisitor {
- public void visit(RelNode node, int ordinal, RelNode parent);
- }
-
- /** */
- private static class TestRelVisitor extends RelVisitor {
- /** */
- final TestVisitor v;
-
- /** */
- TestRelVisitor(TestVisitor v) {
- this.v = v;
- }
-
- /** {@inheritDoc} */
- @Override public void visit(RelNode node, int ordinal, RelNode parent) {
- v.visit(node, ordinal, parent);
-
- super.visit(node, ordinal, parent);
- }
- }
-
- /** */
- protected static void relTreeVisit(RelNode n, TestVisitor v) {
- v.visit(n, -1, null);
-
- n.childrenAccept(new TestRelVisitor(v));
- }
-
- /** */
- private IgniteRel physicalPlan(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
- SchemaPlus schema = createRootSchema(false)
- .add("PUBLIC", publicSchema);
-
- RelTraitDef<?>[] traitDefs = {
- DistributionTraitDef.INSTANCE,
- ConventionTraitDef.INSTANCE,
- RelCollationTraitDef.INSTANCE,
- RewindabilityTraitDef.INSTANCE,
- CorrelationTraitDef.INSTANCE
- };
-
- PlanningContext ctx = PlanningContext.builder()
- .localNodeId(F.first(nodes))
- .originatingNodeId(F.first(nodes))
- .parentContext(Contexts.empty())
- .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .traitDefs(traitDefs)
- .build())
- .logger(log)
- .query(sql)
- .topologyVersion(AffinityTopologyVersion.NONE)
- .build();
-
- RelRoot relRoot;
-
- try (IgnitePlanner planner = ctx.planner()) {
- assertNotNull(planner);
-
- String qry = ctx.query();
-
- assertNotNull(qry);
-
- // Parse
- SqlNode sqlNode = planner.parse(qry);
-
- // Validate
- sqlNode = planner.validate(sqlNode);
-
- // Convert to Relational operators graph
- relRoot = planner.rel(sqlNode);
-
- RelNode rel = relRoot.rel;
-
- assertNotNull(rel);
-
- // Transformation chain
- RelTraitSet desired = rel.getTraitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .simplify();
-
- planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
-
- return planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
- }
- }
-
- /** */
- private List<UUID> intermediateMapping(@NotNull AffinityTopologyVersion topVer, boolean single, @Nullable Predicate<ClusterNode> filter) {
- return single ? select(nodes, 0) : select(nodes, 0, 1, 2, 3);
- }
-
- /** */
- private static <T> List<T> select(List<T> src, int... idxs) {
- ArrayList<T> res = new ArrayList<>(idxs.length);
-
- for (int idx : idxs)
- res.add(src.get(idx));
-
- return res;
- }
-
- /** */
- private <Row> Row row(ExecutionContext<Row> ctx, ImmutableBitSet requiredColunms, Object... fields) {
- Type[] types = new Type[fields.length];
- for (int i = 0; i < fields.length; i++)
- types[i] = fields[i] == null ? Object.class : fields[i].getClass();
-
- if (requiredColunms == null) {
- for (int i = 0; i < fields.length; i++)
- types[i] = fields[i] == null ? Object.class : fields[i].getClass();
- }
- else {
- for (int i = 0, j = requiredColunms.nextSetBit(0); j != -1; j = requiredColunms.nextSetBit(j + 1), i++)
- types[i] = fields[i] == null ? Object.class : fields[i].getClass();
- }
-
- return ctx.rowHandler().factory(types).create(fields);
- }
-
- /** */
- private abstract static class TestTable implements IgniteTable {
- /** */
- private final RelProtoDataType protoType;
-
- /** */
- private final Map<String, IgniteIndex> indexes = new HashMap<>();
-
- /** */
- private final RewindabilityTrait rewindable;
-
- /** */
- private final double rowCnt;
-
- /** */
- private TestTable(RelDataType type) {
- this(type, RewindabilityTrait.REWINDABLE);
- }
-
- /** */
- private TestTable(RelDataType type, RewindabilityTrait rewindable) {
- this(type, rewindable, 100.0);
- }
-
- /** */
- private TestTable(RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
- protoType = RelDataTypeImpl.proto(type);
- this.rewindable = rewindable;
- this.rowCnt = rowCnt;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogicalTableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
- RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
- .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
- .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
-
- return IgniteLogicalTableScan.create(cluster, traitSet, relOptTbl, null, null, null);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName) {
- RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
- .replaceIf(DistributionTraitDef.INSTANCE, this::distribution)
- .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
- .replaceIf(RelCollationTraitDef.INSTANCE, getIndex(idxName)::collation);
-
- return IgniteLogicalIndexScan.create(cluster, traitSet, relOptTbl, idxName, null, null, null);
- }
-
- /** {@inheritDoc} */
- @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
- RelDataType rowType = protoType.apply(typeFactory);
-
- if (bitSet != null) {
- RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
- for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
- b.add(rowType.getFieldList().get(i));
- rowType = b.build();
- }
-
- return rowType;
- }
-
- /** {@inheritDoc} */
- @Override public Statistic getStatistic() {
- return new Statistic() {
- /** {@inheritDoc */
- @Override public Double getRowCount() {
- return rowCnt;
- }
-
- /** {@inheritDoc */
- @Override public boolean isKey(ImmutableBitSet cols) {
- return false;
- }
-
- /** {@inheritDoc */
- @Override public List<ImmutableBitSet> getKeys() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc */
- @Override public List<RelReferentialConstraint> getReferentialConstraints() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc */
- @Override public List<RelCollation> getCollations() {
- return Collections.emptyList();
- }
-
- /** {@inheritDoc */
- @Override public RelDistribution getDistribution() {
- throw new AssertionError();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public <Row> Iterable<Row> scan(
- ExecutionContext<Row> execCtx,
- CollocationGroup group, Predicate<Row> filter,
- Function<Row, Row> transformer,
- ImmutableBitSet bitSet
- ) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public Schema.TableType getJdbcTableType() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isRolledUp(String col) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean rolledUpColumnValidInsideAgg(
- String column,
- SqlCall call,
- SqlNode parent,
- CalciteConnectionConfig config
- ) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public CollocationGroup colocationGroup(PlanningContext ctx) {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteDistribution distribution() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public TableDescriptor descriptor() {
- throw new AssertionError();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, IgniteIndex> indexes() {
- return Collections.unmodifiableMap(indexes);
- }
-
- /** {@inheritDoc} */
- @Override public void addIndex(IgniteIndex idxTbl) {
- indexes.put(idxTbl.name(), idxTbl);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteIndex getIndex(String idxName) {
- return indexes.get(idxName);
- }
-
- /** {@inheritDoc} */
- @Override public void removeIndex(String idxName) {
- throw new AssertionError();
- }
- }
-
- /** */
- private static class TestMessageServiceImpl extends MessageServiceImpl {
- /** */
- private final TestIoManager mgr;
-
- /** */
- private TestMessageServiceImpl(GridTestKernalContext kernal, TestIoManager mgr) {
- super(kernal);
- this.mgr = mgr;
- }
-
- /** {@inheritDoc} */
- @Override public void send(UUID nodeId, CalciteMessage msg) {
- mgr.send(localNodeId(), nodeId, msg);
- }
-
- /** {@inheritDoc} */
- @Override public boolean alive(UUID nodeId) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void prepareMarshal(Message msg) {
- // No-op;
- }
-
- /** {@inheritDoc} */
- @Override protected void prepareUnmarshal(Message msg) {
- // No-op;
- }
- }
-
- /** */
- private class TestFailureProcessor extends FailureProcessor {
- /** */
- private TestFailureProcessor(GridTestKernalContext kernal) {
- super(kernal);
- }
-
- /** {@inheritDoc} */
- @Override public boolean process(FailureContext failureCtx) {
- Throwable ex = failureContext().error();
- log().error(ex.getMessage(), ex);
-
- lastE = ex;
- return true;
+ checkSplitAndSerialization(phys, publicSchema);
}
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableSpoolPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableSpoolPlannerTest.java
new file mode 100644
index 0000000..c472142
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableSpoolPlannerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+/**
+ * Table spool test.
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class TableSpoolPlannerTest extends AbstractPlannerTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tableSpoolDistributed() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable t0 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T0", "hash");
+ }
+ };
+
+ TestTable t1 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "T1", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("T0", t0);
+ publicSchema.addTable("T1", t1);
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid = t1.jid";
+
+ RelNode phys = physicalPlan(sql, publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
+
+ assertNotNull(phys);
+
+ IgniteTableSpool tblSpool = findFirstNode(phys, byClass(IgniteTableSpool.class));
+
+ assertNotNull("Invalid plan:\n" + RelOptUtil.toString(phys), tblSpool);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tableSpoolBroadcastNotRewindable() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable t0 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build(),
+ RewindabilityTrait.ONE_WAY) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable t1 = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("JID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build(),
+ RewindabilityTrait.ONE_WAY) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("T0", t0);
+ publicSchema.addTable("T1", t1);
+
+ String sql = "select * " +
+ "from t0 " +
+ "join t1 on t0.jid = t1.jid";
+
+ RelNode phys = physicalPlan(sql, publicSchema,
+ "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
+
+ assertNotNull(phys);
+
+ IgniteTableSpool tblSpool = findFirstNode(phys, byClass(IgniteTableSpool.class));
+
+ assertNotNull("Invalid plan:\n" + RelOptUtil.toString(phys), tblSpool);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
similarity index 53%
copy from modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
copy to modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index c346ada..4de7817 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -17,22 +17,12 @@
package org.apache.ignite.testsuites;
-import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
-import org.apache.ignite.internal.processors.query.calcite.CancelTest;
-import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
-import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.TableSpoolTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -41,22 +31,12 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- PlannerTest.class,
- OrToUnionRuleTest.class,
- ProjectScanMergeRuleTest.class,
ExecutionTest.class,
+ ContinuousExecutionTest.class,
MergeJoinExecutionTest.class,
NestedLoopJoinExecutionTest.class,
- ClosableIteratorsHolderTest.class,
- ContinuousExecutionTest.class,
- CalciteQueryProcessorTest.class,
- JdbcQueryTest.class,
- CalciteBasicSecondaryIndexIntegrationTest.class,
- CancelTest.class,
- QueryCheckerTest.class,
- DateTimeTest.class,
- TableSpoolTest.class,
- LimitOffsetTest.class
+ TableSpoolExecutionTest.class,
+ IndexSpoolExecutionTest.class,
})
-public class IgniteCalciteTestSuite {
+public class ExecutionTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index c346ada..a372500 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -22,14 +22,9 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.TableSpoolTest;
import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
@@ -41,12 +36,10 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- PlannerTest.class,
+ PlannerTestSuite.class,
+ ExecutionTestSuite.class,
OrToUnionRuleTest.class,
ProjectScanMergeRuleTest.class,
- ExecutionTest.class,
- MergeJoinExecutionTest.class,
- NestedLoopJoinExecutionTest.class,
ClosableIteratorsHolderTest.class,
ContinuousExecutionTest.class,
CalciteQueryProcessorTest.class,
@@ -55,7 +48,6 @@ import org.junit.runners.Suite;
CancelTest.class,
QueryCheckerTest.class,
DateTimeTest.class,
- TableSpoolTest.class,
LimitOffsetTest.class
})
public class IgniteCalciteTestSuite {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
new file mode 100644
index 0000000..ad329f1
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Calcite tests.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ PlannerTest.class,
+ CorrelatedNestedLoopJoinPlannerTest.class,
+ TableSpoolPlannerTest.class,
+ IndexSpoolPlannerTest.class
+})
+public class PlannerTestSuite {
+}