You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/12/09 21:34:24 UTC
[ignite] branch ignite-12248 updated: planner rethinking
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new b45e813 planner rethinking
b45e813 is described below
commit b45e813df3989d5edbf1262678381fdc7caec2c5
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Dec 10 00:34:04 2019 +0300
planner rethinking
---
.../{Util.java => InterpreterUtils.java} | 2 +-
.../Util.java => plan/volcano/VolcanoUtils.java} | 16 +-
.../query/calcite/CalciteQueryProcessor.java | 2 -
.../cluster/TableDistributionServiceImpl.java | 87 ----
.../query/calcite/exec/ConsumerNode.java | 2 +-
.../query/calcite/exec/ImplementorImpl.java | 174 +++++++
.../query/calcite/exec/Interpretable.java | 510 ---------------------
.../query/calcite/exec/ScalarFactory.java | 157 +++++++
.../processors/query/calcite/exec/ScanNode.java | 2 +-
.../query/calcite/metadata/FragmentInfo.java | 10 +-
.../metadata/IgniteMdDerivedDistribution.java | 149 ++++++
.../calcite/metadata/IgniteMdDistribution.java | 196 +++-----
.../calcite/metadata/IgniteMdFragmentInfo.java | 27 +-
.../query/calcite/metadata/IgniteMetadata.java | 29 +-
.../metadata/OptimisticPlanningException.java | 2 +-
.../query/calcite/metadata/RelMetadataQueryEx.java | 38 +-
.../calcite/prepare/DistributedExecution.java | 8 +-
.../query/calcite/prepare/IgnitePlanner.java | 35 +-
.../query/calcite/prepare/PlannerContext.java | 23 +-
.../calcite/{rule => prepare}/PlannerPhase.java | 29 +-
.../calcite/{rule => prepare}/PlannerType.java | 2 +-
.../query/calcite/rel/IgniteConvention.java | 41 ++
.../query/calcite/rel/IgniteExchange.java | 43 +-
.../processors/query/calcite/rel/IgniteFilter.java | 85 +---
.../processors/query/calcite/rel/IgniteJoin.java | 65 +--
.../query/calcite/rel/IgniteProject.java | 77 +---
.../query/calcite/rel/IgniteReceiver.java | 39 +-
.../processors/query/calcite/rel/IgniteRel.java | 26 +-
.../calcite/{util => rel}/IgniteRelShuttle.java | 9 +-
.../processors/query/calcite/rel/IgniteSender.java | 44 +-
.../query/calcite/rel/IgniteTableScan.java | 49 +-
.../DistributionType.java => rel/Implementor.java} | 41 +-
.../query/calcite/{util => rel}/RelOp.java | 2 +-
.../calcite/rule/AbstractVariableConverter.java | 47 ++
.../query/calcite/rule/FilterConverter.java | 62 +++
.../query/calcite/rule/IgniteFilterRule.java | 56 ---
.../query/calcite/rule/IgniteJoinRule.java | 140 ------
.../query/calcite/rule/IgniteProjectRule.java | 57 ---
.../processors/query/calcite/rule/IgniteRules.java | 160 -------
.../query/calcite/rule/JoinConverter.java | 68 +++
.../query/calcite/rule/ProjectConverter.java | 63 +++
.../TableScanConverter.java} | 25 +-
.../query/calcite/schema/IgniteSchema.java | 6 +-
.../query/calcite/schema/IgniteTable.java | 67 ++-
.../serialize/relation/ConversionContext.java | 4 +-
.../calcite/serialize/relation/FilterNode.java | 30 +-
.../serialize/relation/GraphToRelConverter.java | 7 +-
.../query/calcite/serialize/relation/JoinNode.java | 10 +-
.../calcite/serialize/relation/ProjectNode.java | 19 +-
.../calcite/serialize/relation/ReceiverNode.java | 10 +-
.../serialize/relation/RelToGraphConverter.java | 10 +-
.../calcite/serialize/relation/SenderNode.java | 22 +-
.../serialize/relation/SerializedTraits.java | 28 +-
.../calcite/serialize/relation/TableScanNode.java | 10 +-
.../query/calcite/{util => splitter}/Edge.java | 2 +-
.../query/calcite/splitter/Fragment.java | 15 +-
.../query/calcite/splitter/QueryPlan.java | 7 +-
.../splitter/{Source.java => RelSource.java} | 8 +-
.../{SourceImpl.java => RelSourceImpl.java} | 4 +-
.../splitter/{Target.java => RelTarget.java} | 6 +-
.../{TargetImpl.java => RelTargetImpl.java} | 10 +-
.../query/calcite/splitter/Splitter.java | 2 +-
...shFunctionFactory.java => AffinityFactory.java} | 42 +-
.../query/calcite/trait/DistributionTrait.java | 83 +++-
.../query/calcite/trait/DistributionTraitDef.java | 43 +-
.../query/calcite/trait/HashFunctionFactory.java | 2 +-
.../IgniteDistribution.java} | 11 +-
.../query/calcite/trait/IgniteDistributions.java | 252 ++++++++--
.../processors/query/calcite/util/Commons.java | 49 +-
.../query/calcite/util/IgniteMethod.java | 4 +-
.../query/calcite/util/RelImplementor.java | 40 --
.../query/calcite/CalciteQueryProcessorTest.java | 265 +++++------
72 files changed, 1729 insertions(+), 2038 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java b/modules/calcite/src/main/java/org/apache/calcite/interpreter/InterpreterUtils.java
similarity index 96%
copy from modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
copy to modules/calcite/src/main/java/org/apache/calcite/interpreter/InterpreterUtils.java
index a4ccb83..edae3cc 100644
--- a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
+++ b/modules/calcite/src/main/java/org/apache/calcite/interpreter/InterpreterUtils.java
@@ -21,7 +21,7 @@ import org.apache.calcite.DataContext;
/**
*
*/
-public class Util {
+public class InterpreterUtils {
public static Context createContext(DataContext ctx) {
return new Context(ctx);
}
diff --git a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java b/modules/calcite/src/main/java/org/apache/calcite/plan/volcano/VolcanoUtils.java
similarity index 63%
rename from modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
rename to modules/calcite/src/main/java/org/apache/calcite/plan/volcano/VolcanoUtils.java
index a4ccb83..a1874dc 100644
--- a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
+++ b/modules/calcite/src/main/java/org/apache/calcite/plan/volcano/VolcanoUtils.java
@@ -14,15 +14,21 @@
* limitations under the License.
*/
-package org.apache.calcite.interpreter;
+package org.apache.calcite.plan.volcano;
-import org.apache.calcite.DataContext;
+import org.apache.calcite.plan.RelTraitSet;
/**
*
*/
-public class Util {
- public static Context createContext(DataContext ctx) {
- return new Context(ctx);
+public class VolcanoUtils {
+ public static RelSubset subset(RelSubset subset, RelTraitSet traits) {
+ return subset.set.getOrCreateSubset(subset.getCluster(), traits.simplify());
+ }
+
+ public static VolcanoPlanner impatient(VolcanoPlanner planner) {
+ planner.impatient = true;
+
+ return planner;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index c244f4e..86a9a2e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.calcite.cluster.MappingServiceImpl;
-import org.apache.ignite.internal.processors.query.calcite.cluster.TableDistributionServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
@@ -154,7 +153,6 @@ public class CalciteQueryProcessor implements QueryEngine {
.query(query)
.schema(schemaHolder.schema())
.topologyVersion(readyAffinityVersion())
- .distributionService(new TableDistributionServiceImpl(kernalContext))
.mappingService(new MappingServiceImpl(kernalContext))
.build();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
deleted file mode 100644
index 20da342..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.cluster;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.function.ToIntFunction;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- *
- */
-public class TableDistributionServiceImpl implements TableDistributionService {
- private final GridKernalContext ctx;
-
- public TableDistributionServiceImpl(GridKernalContext ctx) {
- this.ctx = ctx;
- }
-
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
-
- if (grp.isReplicated())
- return IgniteDistributions.broadcast();
-
- Object key = grp.affinity().similarAffinityKey();
-
- return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
- }
-
- private final static class AffinityFactory extends AbstractDestinationFunctionFactory {
- private final int cacheId;
- private final Object key;
-
- AffinityFactory(int cacheId, Object key) {
- this.cacheId = cacheId;
- this.key = key;
- }
-
- @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) {
- assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
-
- List<List<UUID>> assignments = mapping.assignments();
-
- if (U.assertionsEnabled()) {
- for (List<UUID> assignment : assignments) {
- assert F.isEmpty(assignment) || assignment.size() == 1;
- }
- }
-
- ToIntFunction<Object> rowToPart = ctx.kernalContext()
- .cache().context().cacheContext(cacheId).affinity()::partition;
-
- return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
- }
-
- @Override public Object key() {
- return key;
- }
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
index c2a6211..100f21f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -30,7 +30,7 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
private ArrayDeque<Object[]> buff;
- protected ConsumerNode() {
+ public ConsumerNode() {
super(Sink.noOp());
buff = new ArrayDeque<>(DEFAULT_BUFFER_SIZE);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
new file mode 100644
index 0000000..2a65887
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Outbox;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+
+import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.PLANNER_CONTEXT;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.QUERY_ID;
+
+/**
+ *
+ */
+public class ImplementorImpl implements Implementor<Node<Object[]>>, RelOp<IgniteRel, Node<Object[]>> {
+ private final PlannerContext ctx;
+ private final DataContext root;
+ private final ScalarFactory factory;
+ private Deque<Sink<Object[]>> stack;
+
+ public ImplementorImpl(DataContext root) {
+ this.root = root;
+
+ ctx = PLANNER_CONTEXT.get(root);
+ factory = new ScalarFactory(new RexBuilder(ctx.typeFactory()));
+ stack = new ArrayDeque<>();
+ }
+
+ @Override public Node<Object[]> implement(IgniteSender rel) {
+ assert stack.isEmpty();
+
+ GridCacheVersion id = QUERY_ID.get(root);
+ long exchangeId = rel.target().exchangeId();
+ NodesMapping mapping = rel.target().mapping();
+ List<UUID> targets = mapping.nodes();
+ IgniteDistribution distribution = rel.target().distribution();
+ DestinationFunctionFactory destFactory = distribution.destinationFunctionFactory();
+ DestinationFunction function = destFactory.create(ctx, mapping, ImmutableIntList.copyOf(distribution.getKeys()));
+
+ Outbox<Object[]> res = new Outbox<>(id, exchangeId, targets, function);
+
+ stack.push(res.sink());
+
+ res.source(source(rel.getInput()));
+
+ return res;
+ }
+
+ @Override public Node<Object[]> implement(IgniteFilter rel) {
+ assert !stack.isEmpty();
+
+ FilterNode res = new FilterNode(stack.pop(), factory.filterPredicate(root, rel.getCondition(), rel.getRowType()));
+
+ stack.push(res.sink());
+
+ res.source(source(rel.getInput()));
+
+ return res;
+ }
+
+ @Override public Node<Object[]> implement(IgniteProject rel) {
+ assert !stack.isEmpty();
+
+ ProjectNode res = new ProjectNode(stack.pop(), factory.projectExpression(root, rel.getProjects(), rel.getInput().getRowType()));
+
+ stack.push(res.sink());
+
+ res.source(source(rel.getInput()));
+
+ return res;
+ }
+
+ @Override public Node<Object[]> implement(IgniteJoin rel) {
+ assert !stack.isEmpty();
+
+ JoinNode res = new JoinNode(stack.pop(), factory.joinExpression(root, rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType()));
+
+ stack.push(res.sink(1));
+ stack.push(res.sink(0));
+
+ res.sources(sources(rel.getInputs()));
+
+ return res;
+ }
+
+ @Override public Node<Object[]> implement(IgniteTableScan rel) {
+ assert !stack.isEmpty();
+
+ Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root);
+
+ return new ScanNode(stack.pop(), source);
+ }
+
+ @Override public Node<Object[]> implement(IgniteReceiver rel) {
+ throw new AssertionError(); // TODO
+ }
+
+ @Override public Node<Object[]> implement(IgniteExchange rel) {
+ throw new AssertionError();
+ }
+
+ @Override public Node<Object[]> implement(IgniteRel other) {
+ throw new AssertionError();
+ }
+
+ private Source source(RelNode rel) {
+ if (rel.getConvention() != IgniteConvention.INSTANCE)
+ throw new IllegalStateException("INTERPRETABLE is required.");
+
+ return ((IgniteRel) rel).implement(this);
+ }
+
+ private List<Source> sources(List<RelNode> rels) {
+ ArrayList<Source> res = new ArrayList<>(rels.size());
+
+ for (RelNode rel : rels) {
+ res.add(source(rel));
+ }
+
+ return res;
+ }
+
+ @Override public Node<Object[]> go(IgniteRel rel) {
+ if (rel instanceof IgniteSender)
+ return implement((IgniteSender) rel);
+
+ ConsumerNode res = new ConsumerNode();
+
+ stack.push(res.sink());
+
+ res.source(source(rel));
+
+ return res;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java
deleted file mode 100644
index 04108d4..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.exec;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.JaninoRexCompiler;
-import org.apache.calcite.interpreter.Scalar;
-import org.apache.calcite.interpreter.Util;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.AbstractRelNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.calcite.exchange.Outbox;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.jetbrains.annotations.NotNull;
-
-import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.PLANNER_CONTEXT;
-import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.QUERY_ID;
-
-/**
- *
- */
-public class Interpretable {
- public static final Convention INTERPRETABLE = new Convention.Impl("INTERPRETABLE", InterRel.class) {};
-
- public static final List<RelOptRule> RULES = ImmutableList.of(
- new TableScanConverter(),
- new JoinConverter(),
- new ProjectConverter(),
- new FilterConverter(),
- new SenderConverter(),
- new ReceiverConverter()
- );
-
- public interface InterRel extends RelNode {
- <T> Node<T> implement(Implementor<T> implementor);
- }
-
- public static class TableScanConverter extends ConverterRule {
- public TableScanConverter() {
- super(IgniteTableScan.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "TableScanConverter");
- }
-
- @Override public RelNode convert(RelNode rel) {
- IgniteTableScan scan = (IgniteTableScan) rel;
-
- RelTraitSet traitSet = scan.getTraitSet().replace(INTERPRETABLE);
- return new ScanRel(rel.getCluster(), traitSet, scan.getTable());
- }
- }
-
- public static class JoinConverter extends ConverterRule {
- public JoinConverter() {
- super(IgniteJoin.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "JoinConverter");
- }
-
- @Override public RelNode convert(RelNode rel) {
- IgniteJoin join = (IgniteJoin) rel;
-
- RelNode left = convert(join.getLeft(), INTERPRETABLE);
- RelNode right = convert(join.getRight(), INTERPRETABLE);
-
- RelTraitSet traitSet = join.getTraitSet().replace(INTERPRETABLE);
-
- return new JoinRel(rel.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType());
- }
- }
-
- public static class ProjectConverter extends ConverterRule {
- public ProjectConverter() {
- super(IgniteProject.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ProjectConverter");
- }
-
- @Override public RelNode convert(RelNode rel) {
- IgniteProject project = (IgniteProject) rel;
- RelTraitSet traitSet = project.getTraitSet().replace(INTERPRETABLE);
- RelNode input = convert(project.getInput(), INTERPRETABLE);
-
- return new ProjectRel(rel.getCluster(), traitSet, input, project.getProjects(), project.getRowType());
- }
- }
-
- public static class FilterConverter extends ConverterRule {
- public FilterConverter() {
- super(IgniteFilter.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "FilterConverter");
- }
-
- @Override public RelNode convert(RelNode rel) {
- IgniteFilter filter = (IgniteFilter) rel;
- RelTraitSet traitSet = filter.getTraitSet().replace(INTERPRETABLE);
- RelNode input = convert(filter.getInput(), INTERPRETABLE);
-
- return new FilterRel(rel.getCluster(), traitSet, input, filter.getCondition());
- }
- }
-
- public static class SenderConverter extends ConverterRule {
- public SenderConverter() {
- super(IgniteSender.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "SenderConverter");
- }
-
- @Override public RelNode convert(RelNode rel) {
- IgniteSender sender = (IgniteSender) rel;
- RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE);
- RelNode input = convert(sender.getInput(), INTERPRETABLE);
-
- return new SenderRel(rel.getCluster(), traitSet, input, sender.target());
- }
- }
-
- public static class ReceiverConverter extends ConverterRule {
- public ReceiverConverter() {
- super(IgniteReceiver.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ReceiverConverter");
- }
-
- @Override public RelNode convert(RelNode rel) {
- IgniteReceiver sender = (IgniteReceiver) rel;
- RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE);
-
- return new ReceiverRel(rel.getCluster(), traitSet, sender.source());
- }
- }
-
- public static class ReceiverRel extends AbstractRelNode implements InterRel {
- private final org.apache.ignite.internal.processors.query.calcite.splitter.Source source;
-
- protected ReceiverRel(RelOptCluster cluster, RelTraitSet traits, org.apache.ignite.internal.processors.query.calcite.splitter.Source source) {
- super(cluster, traits);
-
- this.source = source;
- }
-
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new ReceiverRel(getCluster(), traitSet, source);
- }
-
- @Override public <T> Node<T> implement(Implementor<T> implementor) {
- return implementor.implement(this);
- }
- }
-
- public static class SenderRel extends SingleRel implements InterRel {
- private final Target target;
-
- protected SenderRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull Target target) {
- super(cluster, traits, input);
- this.target = target;
- }
-
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new SenderRel(getCluster(), traitSet, sole(inputs), target);
- }
-
- @Override public <T> Node<T> implement(Implementor<T> implementor) {
- return implementor.implement(this);
- }
- }
-
- public static class FilterRel extends Filter implements InterRel {
- protected FilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
- super(cluster, traits, child, condition);
- }
-
- @Override public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
- return new FilterRel(getCluster(), traitSet, input, condition);
- }
-
- @Override public <T> Node<T> implement(Implementor<T> implementor) {
- return implementor.implement(this);
- }
- }
-
- public static class ProjectRel extends Project implements InterRel {
- protected ProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-
- @Override public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
- return new ProjectRel(getCluster(), traitSet, input, projects, rowType);
- }
-
- @Override public <T> Node<T> implement(Implementor<T> implementor) {
- return implementor.implement(this);
- }
- }
-
- public static class JoinRel extends Join implements InterRel {
- protected JoinRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
- super(cluster, traitSet, left, right, condition, variablesSet, joinType);
- }
-
- @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
- return new JoinRel(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
- }
-
- @Override public <T> Node<T> implement(Implementor<T> implementor) {
- return implementor.implement(this);
- }
- }
-
- public static class ScanRel extends TableScan implements InterRel {
- protected ScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
- super(cluster, traitSet, table);
- }
-
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return this;
- }
-
- @Override public <T> Node<T> implement(Implementor<T> implementor) {
- return implementor.implement(this);
- }
- }
-
- public static class Implementor<T> {
- private final PlannerContext ctx;
- private final DataContext root;
- private final ScalarFactory factory;
- private Deque<Sink<T>> stack;
-
- public Implementor(DataContext root) {
- this.root = root;
-
- ctx = PLANNER_CONTEXT.get(root);
- factory = new ScalarFactory(new RexBuilder(ctx.typeFactory()));
- stack = new ArrayDeque<>();
- }
-
- public Node<T> implement(SenderRel rel) {
- assert stack.isEmpty();
-
- GridCacheVersion id = QUERY_ID.get(root);
- long exchangeId = rel.target.exchangeId();
- NodesMapping mapping = rel.target.mapping();
- List<UUID> targets = mapping.nodes();
- DistributionTrait distribution = rel.target.distribution();
- DestinationFunctionFactory destFactory = distribution.destinationFunctionFactory();
- DestinationFunction function = destFactory.create(ctx, mapping, distribution.keys());
-
- Outbox<T> res = new Outbox<>(id, exchangeId, targets, function);
-
- stack.push(res.sink());
-
- res.source(source(rel.getInput()));
-
- return res;
- }
-
- public Node<T> implement(FilterRel rel) {
- assert !stack.isEmpty();
-
- FilterNode res = new FilterNode((Sink<Object[]>) stack.pop(), factory.filterPredicate(root, rel.getCondition(), rel.getRowType()));
-
- stack.push((Sink<T>) res.sink());
-
- res.source(source(rel.getInput()));
-
- return (Node<T>) res;
- }
-
- public Node<T> implement(ProjectRel rel) {
- assert !stack.isEmpty();
-
- ProjectNode res = new ProjectNode((Sink<Object[]>) stack.pop(), factory.projectExpression(root, rel.getProjects(), rel.getInput().getRowType()));
-
- stack.push((Sink<T>) res.sink());
-
- res.source(source(rel.getInput()));
-
- return (Node<T>) res;
- }
-
- public Node<T> implement(JoinRel rel) {
- assert !stack.isEmpty();
-
- JoinNode res = new JoinNode((Sink<Object[]>) stack.pop(), factory.joinExpression(root, rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType()));
-
- stack.push((Sink<T>) res.sink(1));
- stack.push((Sink<T>) res.sink(0));
-
- res.sources(sources(rel.getInputs()));
-
- return (Node<T>) res;
- }
-
- public Node<T> implement(ScanRel rel) {
- assert !stack.isEmpty();
-
- Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root);
-
- return (Node<T>) new ScanNode((Sink<Object[]>)stack.pop(), source);
- }
-
- public Node<T> implement(ReceiverRel rel) {
- throw new AssertionError(); // TODO
- }
-
- private Source source(RelNode rel) {
- if (rel.getConvention() != INTERPRETABLE)
- throw new IllegalStateException("INTERPRETABLE is required.");
-
- return ((InterRel)rel).implement(this);
- }
-
- private List<Source> sources(List<RelNode> rels) {
- ArrayList<Source> res = new ArrayList<>(rels.size());
-
- for (RelNode rel : rels) {
- res.add(source(rel));
- }
-
- return res;
- }
-
- public Node<T> go(RelNode rel) {
- if (rel.getConvention() != INTERPRETABLE)
- throw new IllegalStateException("INTERPRETABLE is required.");
-
- if (rel instanceof SenderRel)
- return implement((SenderRel)rel);
-
- ConsumerNode res = new ConsumerNode();
-
- stack.push((Sink<T>) res.sink());
-
- res.source(source(rel));
-
- return (Node<T>) res;
- }
- }
-
- /**
- *
- */
- private static class ScalarFactory {
- private final JaninoRexCompiler rexCompiler;
- private final RexBuilder builder;
-
- private ScalarFactory(RexBuilder builder) {
- rexCompiler = new JaninoRexCompiler(builder);
- this.builder = builder;
- }
-
- public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
- System.out.println("filterPredicate for" + filter);
-
- Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
- Context ctx = Util.createContext(root);
-
- return new FilterPredicate<>(ctx, scalar);
- }
-
- public <T> Function<T, T> projectExpression(DataContext root, List<RexNode> projects, RelDataType rowType) {
- System.out.println("joinExpression for" + projects);
-
- Scalar scalar = rexCompiler.compile(projects, rowType);
- Context ctx = Util.createContext(root);
- int count = projects.size();
-
- return new ProjectExpression<>(ctx, scalar, count);
- }
-
- public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
- System.out.println("joinExpression for" + expression);
-
- RelDataType rowType = combinedType(leftType, rightType);
-
- Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
- Context ctx = Util.createContext(root);
- ctx.values = new Object[rowType.getFieldCount()];
-
- return new JoinExpression<>(ctx, scalar);
- }
-
- private RelDataType combinedType(RelDataType... types) {
- RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
-
- for (RelDataType type : types)
- typeBuilder.addAll(type.getFieldList());
-
- return typeBuilder.build();
- }
-
- private static class FilterPredicate<T> implements Predicate<T> {
- private final Context ctx;
- private final Scalar scalar;
- private final Object[] vals;
-
- private FilterPredicate(Context ctx, Scalar scalar) {
- this.ctx = ctx;
- this.scalar = scalar;
-
- vals = new Object[1];
- }
-
- @Override public boolean test(T r) {
- ctx.values = (Object[]) r;
- scalar.execute(ctx, vals);
- return (Boolean) vals[0];
- }
- }
-
- private static class JoinExpression<T> implements BiFunction<T, T, T> {
- private final Object[] vals;
- private final Context ctx;
- private final Scalar scalar;
-
- private Object[] left0;
-
- private JoinExpression(Context ctx, Scalar scalar) {
- this.ctx = ctx;
- this.scalar = scalar;
-
- vals = new Object[1];
- }
-
- @Override public T apply(T left, T right) {
- if (left0 != left) {
- left0 = (Object[]) left;
- System.arraycopy(left0, 0, ctx.values, 0, left0.length);
- }
-
- Object[] right0 = (Object[]) right;
- System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
-
- scalar.execute(ctx, vals);
-
- if ((Boolean) vals[0])
- return (T) Arrays.copyOf(ctx.values, ctx.values.length);
-
- return null;
- }
- }
-
- private static class ProjectExpression<T> implements Function<T, T> {
- private final Context ctx;
- private final Scalar scalar;
- private final int count;
-
- private ProjectExpression(Context ctx, Scalar scalar, int count) {
- this.ctx = ctx;
- this.scalar = scalar;
- this.count = count;
- }
-
- @Override public T apply(T r) {
- ctx.values = (Object[]) r;
- Object[] res = new Object[count];
- scalar.execute(ctx, res);
-
- return (T) res;
- }
- }
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
new file mode 100644
index 0000000..1d4e31d
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.InterpreterUtils;
+import org.apache.calcite.interpreter.JaninoRexCompiler;
+import org.apache.calcite.interpreter.Scalar;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ *
+ */
+public class ScalarFactory {
+ private final JaninoRexCompiler rexCompiler;
+ private final RexBuilder builder;
+
+ public ScalarFactory(RexBuilder builder) {
+ rexCompiler = new JaninoRexCompiler(builder);
+ this.builder = builder;
+ }
+
+ public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
+ System.out.println("filterPredicate for" + filter);
+
+ Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
+ Context ctx = InterpreterUtils.createContext(root);
+
+ return new FilterPredicate<>(ctx, scalar);
+ }
+
+ public <T> Function<T, T> projectExpression(DataContext root, List<RexNode> projects, RelDataType rowType) {
+ System.out.println("joinExpression for" + projects);
+
+ Scalar scalar = rexCompiler.compile(projects, rowType);
+ Context ctx = InterpreterUtils.createContext(root);
+ int count = projects.size();
+
+ return new ProjectExpression<>(ctx, scalar, count);
+ }
+
+ public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
+ System.out.println("joinExpression for" + expression);
+
+ RelDataType rowType = combinedType(leftType, rightType);
+
+ Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
+ Context ctx = InterpreterUtils.createContext(root);
+ ctx.values = new Object[rowType.getFieldCount()];
+
+ return new JoinExpression<>(ctx, scalar);
+ }
+
+ private RelDataType combinedType(RelDataType... types) {
+ RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
+
+ for (RelDataType type : types)
+ typeBuilder.addAll(type.getFieldList());
+
+ return typeBuilder.build();
+ }
+
+ private static class FilterPredicate<T> implements Predicate<T> {
+ private final Context ctx;
+ private final Scalar scalar;
+ private final Object[] vals;
+
+ private FilterPredicate(Context ctx, Scalar scalar) {
+ this.ctx = ctx;
+ this.scalar = scalar;
+
+ vals = new Object[1];
+ }
+
+ @Override public boolean test(T r) {
+ ctx.values = (Object[]) r;
+ scalar.execute(ctx, vals);
+ return (Boolean) vals[0];
+ }
+ }
+
+ private static class JoinExpression<T> implements BiFunction<T, T, T> {
+ private final Object[] vals;
+ private final Context ctx;
+ private final Scalar scalar;
+
+ private Object[] left0;
+
+ private JoinExpression(Context ctx, Scalar scalar) {
+ this.ctx = ctx;
+ this.scalar = scalar;
+
+ vals = new Object[1];
+ }
+
+ @Override public T apply(T left, T right) {
+ if (left0 != left) {
+ left0 = (Object[]) left;
+ System.arraycopy(left0, 0, ctx.values, 0, left0.length);
+ }
+
+ Object[] right0 = (Object[]) right;
+ System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
+
+ scalar.execute(ctx, vals);
+
+ if ((Boolean) vals[0])
+ return (T) Arrays.copyOf(ctx.values, ctx.values.length);
+
+ return null;
+ }
+ }
+
+ private static class ProjectExpression<T> implements Function<T, T> {
+ private final Context ctx;
+ private final Scalar scalar;
+ private final int count;
+
+ private ProjectExpression(Context ctx, Scalar scalar, int count) {
+ this.ctx = ctx;
+ this.scalar = scalar;
+ this.count = count;
+ }
+
+ @Override public T apply(T r) {
+ ctx.values = (Object[]) r;
+ Object[] res = new Object[count];
+ scalar.execute(ctx, res);
+
+ return (T) res;
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
index 37c128a..6acaa2b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -32,7 +32,7 @@ public class ScanNode implements SingleNode<Object[]> {
private Iterator<Object[]> it;
private Object[] row;
- protected ScanNode(Sink<Object[]> target, Iterable<Object[]> source) {
+ public ScanNode(Sink<Object[]> target, Iterable<Object[]> source) {
this.target = target;
this.source = source;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index b282ea7..de15e4d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -19,16 +19,16 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
/**
*
*/
public class FragmentInfo {
private final NodesMapping mapping;
- private final ImmutableList<Pair<IgniteReceiver, Source>> sources;
+ private final ImmutableList<Pair<IgniteReceiver, RelSource>> sources;
- public FragmentInfo(Pair<IgniteReceiver, Source> source) {
+ public FragmentInfo(Pair<IgniteReceiver, RelSource> source) {
this(ImmutableList.of(source), null);
}
@@ -36,7 +36,7 @@ public class FragmentInfo {
this(null, mapping);
}
- public FragmentInfo(ImmutableList<Pair<IgniteReceiver, Source>> sources, NodesMapping mapping) {
+ public FragmentInfo(ImmutableList<Pair<IgniteReceiver, RelSource>> sources, NodesMapping mapping) {
this.sources = sources;
this.mapping = mapping;
}
@@ -45,7 +45,7 @@ public class FragmentInfo {
return mapping;
}
- public ImmutableList<Pair<IgniteReceiver, Source>> sources() {
+ public ImmutableList<Pair<IgniteReceiver, RelSource>> sources() {
return sources;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDerivedDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDerivedDistribution.java
new file mode 100644
index 0000000..6858b13
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDerivedDistribution.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.AbstractConverter;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.plan.volcano.VolcanoUtils;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class IgniteMdDerivedDistribution implements MetadataHandler<DerivedDistribution> {
+ /** */
+ private static final ThreadLocal<Convention> REQUESTED_CONVENTION = ThreadLocal.withInitial(() -> Convention.NONE);
+
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ IgniteMethod.DERIVED_DISTRIBUTIONS.method(), new IgniteMdDerivedDistribution());
+
+ @Override public MetadataDef<DerivedDistribution> getDef() {
+ return DerivedDistribution.DEF;
+ }
+
+ public List<IgniteDistribution> deriveDistributions(AbstractConverter rel, RelMetadataQuery mq) {
+ return Collections.emptyList();
+ }
+
+ public List<IgniteDistribution> deriveDistributions(RelNode rel, RelMetadataQuery mq) {
+ return F.asList(IgniteMdDistribution._distribution(rel, mq));
+ }
+
+ public List<IgniteDistribution> deriveDistributions(IgniteRel rel, RelMetadataQuery mq) {
+ return F.asList(IgniteMdDistribution._distribution(rel, mq));
+ }
+
+ public List<IgniteDistribution> deriveDistributions(LogicalTableScan rel, RelMetadataQuery mq) {
+ return F.asList(IgniteMdDistribution._distribution(rel, mq));
+ }
+
+ public List<IgniteDistribution> deriveDistributions(LogicalValues rel, RelMetadataQuery mq) {
+ return F.asList(IgniteMdDistribution._distribution(rel, mq));
+ }
+
+ public List<IgniteDistribution> deriveDistributions(LogicalProject rel, RelMetadataQuery mq) {
+ Mappings.TargetMapping mapping =
+ Project.getPartialMapping(rel.getInput().getRowType().getFieldCount(), rel.getProjects());
+
+ return Commons.transform(_deriveDistributions(rel.getInput(), mq), i -> i.apply(mapping));
+ }
+
+ public List<IgniteDistribution> deriveDistributions(SingleRel rel, RelMetadataQuery mq) {
+ if (rel instanceof IgniteRel)
+ return deriveDistributions((IgniteRel)rel, mq);
+
+ return _deriveDistributions(rel.getInput(), mq);
+ }
+
+ public List<IgniteDistribution> deriveDistributions(RelSubset rel, RelMetadataQuery mq) {
+ rel = VolcanoUtils.subset(rel, rel.getTraitSet().replace(REQUESTED_CONVENTION.get()));
+
+ HashSet<IgniteDistribution> res = new HashSet<>();
+
+ for (RelNode rel0 : rel.getRels())
+ res.addAll(_deriveDistributions(rel0, mq));
+
+ if (F.isEmpty(res)) {
+ RelSubset newRel = VolcanoUtils.subset(rel, rel.getTraitSet().replace(Convention.NONE));
+
+ if (newRel != rel) {
+ for (RelNode rel0 : newRel.getRels())
+ res.addAll(_deriveDistributions(rel0, mq));
+ }
+ }
+
+ return new ArrayList<>(res);
+ }
+
+ public List<IgniteDistribution> deriveDistributions(HepRelVertex rel, RelMetadataQuery mq) {
+ return _deriveDistributions(rel.getCurrentRel(), mq);
+ }
+
+ public List<IgniteDistribution> deriveDistributions(LogicalFilter rel, RelMetadataQuery mq) {
+ return _deriveDistributions(rel.getInput(), mq);
+ }
+
+ public List<IgniteDistribution> deriveDistributions(LogicalJoin rel, RelMetadataQuery mq) {
+ List<IgniteDistribution> left = _deriveDistributions(rel.getLeft(), mq);
+ List<IgniteDistribution> right = _deriveDistributions(rel.getRight(), mq);
+
+ return Commons.transform(IgniteDistributions.suggestJoin(left, right, rel.analyzeCondition(), rel.getJoinType()),
+ IgniteDistributions.BiSuggestion::out);
+ }
+
+ private static List<IgniteDistribution> _deriveDistributions(RelNode rel, RelMetadataQuery mq) {
+ return RelMetadataQueryEx.wrap(mq).derivedDistributions(rel);
+ }
+
+ public static List<IgniteDistribution> deriveDistributions(RelNode rel, Convention convention, RelMetadataQuery mq) {
+ try {
+ REQUESTED_CONVENTION.set(convention);
+
+ return _deriveDistributions(rel, mq);
+ }
+ finally {
+ REQUESTED_CONVENTION.remove();
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index 7c0491d..9673835 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -16,186 +16,108 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
import org.apache.calcite.rel.metadata.MetadataDef;
import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexSlot;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.calcite.util.BuiltInMethod;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.typedef.F;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE;
+import static org.apache.calcite.rel.RelDistribution.Type.ANY;
/**
*
*/
-public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.DistributionTraitMetadata> {
+public class IgniteMdDistribution implements MetadataHandler<BuiltInMetadata.Distribution> {
public static final RelMetadataProvider SOURCE =
ReflectiveRelMetadataProvider.reflectiveSource(
- IgniteMethod.DISTRIBUTION_TRAIT.method(), new IgniteMdDistribution());
+ BuiltInMethod.DISTRIBUTION.method, new IgniteMdDistribution());
- @Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata> getDef() {
- return IgniteMetadata.DistributionTraitMetadata.DEF;
+ @Override public MetadataDef<BuiltInMetadata.Distribution> getDef() {
+ return BuiltInMetadata.Distribution.DEF;
}
- public DistributionTrait getDistributionTrait(RelNode rel, RelMetadataQuery mq) {
+ public IgniteDistribution distribution(RelNode rel, RelMetadataQuery mq) {
return DistributionTraitDef.INSTANCE.getDefault();
}
- public DistributionTrait getDistributionTrait(Filter filter, RelMetadataQuery mq) {
+ public IgniteDistribution distribution(Filter filter, RelMetadataQuery mq) {
return filter(mq, filter.getInput(), filter.getCondition());
}
- public DistributionTrait getDistributionTrait(Project project, RelMetadataQuery mq) {
+ public IgniteDistribution distribution(Project project, RelMetadataQuery mq) {
return project(mq, project.getInput(), project.getProjects());
}
- public DistributionTrait getDistributionTrait(Join join, RelMetadataQuery mq) {
+ public IgniteDistribution distribution(Join join, RelMetadataQuery mq) {
return join(mq, join.getLeft(), join.getRight(), join.analyzeCondition(), join.getJoinType());
}
- public DistributionTrait getDistributionTrait(RelSubset rel, RelMetadataQuery mq) {
+ public IgniteDistribution distribution(RelSubset rel, RelMetadataQuery mq) {
return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
}
- public DistributionTrait getDistributionTrait(IgniteTableScan rel, RelMetadataQuery mq) {
+ public IgniteDistribution distribution(TableScan rel, RelMetadataQuery mq) {
return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
}
- public static DistributionTrait project(RelMetadataQuery mq, RelNode input, List<? extends RexNode> projects) {
- DistributionTrait trait = distribution(input, mq);
-
- if (trait.type() == HASH) {
- ImmutableIntList keys = trait.keys();
-
- if (keys.size() > projects.size())
- return IgniteDistributions.random();
-
- Map<Integer, Integer> m = new HashMap<>(projects.size());
-
- for (Ord<? extends RexNode> node : Ord.zip(projects)) {
- if (node.e instanceof RexInputRef)
- m.put( ((RexSlot) node.e).getIndex(), node.i);
- else if (node.e.isA(SqlKind.CAST)) {
- RexNode operand = ((RexCall) node.e).getOperands().get(0);
-
- if (operand instanceof RexInputRef)
- m.put(((RexSlot) operand).getIndex(), node.i);
- }
- }
-
- List<Integer> newKeys = new ArrayList<>(keys.size());
-
- for (Integer key : keys) {
- Integer mapped = m.get(key);
-
- if (mapped == null)
- return IgniteDistributions.random();
-
- newKeys.add(mapped);
- }
-
- return IgniteDistributions.hash(newKeys, trait.destinationFunctionFactory());
- }
-
- return trait;
- }
-
- public static DistributionTrait filter(RelMetadataQuery mq, RelNode input, RexNode condition) {
- return distribution(input, mq);
- }
-
- public static DistributionTrait join(RelMetadataQuery mq, RelNode left, RelNode right, JoinInfo joinInfo, JoinRelType joinType) {
- /*
- * Distributions table:
- *
- * ===============INNER JOIN==============
- * hash + hash = hash
- * broadcast + hash = hash
- * hash + broadcast = hash
- * broadcast + broadcast = broadcast
- * single + single = single
- *
- * ===============LEFT JOIN===============
- * hash + hash = hash
- * hash + broadcast = hash
- * broadcast + broadcast = broadcast
- * single + single = single
- *
- * ===============RIGHT JOIN==============
- * hash + hash = hash
- * broadcast + hash = hash
- * broadcast + broadcast = broadcast
- * single + single = single
- *
- * ===========FULL JOIN/CROSS JOIN========
- * broadcast + broadcast = broadcast
- * single + single = single
- *
- *
- * others are impossible TODO assertions
- */
-
- DistributionTrait leftDistr = distribution(left, mq);
- DistributionTrait rightDistr;
-
- switch (joinType) {
- case FULL:
- case LEFT:
- return leftDistr;
- case INNER:
- rightDistr = distribution(right, mq);
-
- if (joinInfo.keys().isEmpty()
- || (leftDistr.type() == HASH || leftDistr.type() == SINGLE)
- || (leftDistr.type() == BROADCAST && rightDistr.type() == BROADCAST))
- return leftDistr;
-
- if (rightDistr == null)
- rightDistr = distribution(right, mq);
-
- assert rightDistr.type() == HASH;
-
- return IgniteDistributions.hash(joinInfo.leftKeys, rightDistr.destinationFunctionFactory());
- case RIGHT:
- rightDistr = distribution(right, mq);
-
- if (leftDistr.type() == SINGLE
- || (leftDistr.type() == HASH && rightDistr.type() == HASH)
- || (leftDistr.type() == BROADCAST && rightDistr.type() == BROADCAST))
- return leftDistr;
- assert rightDistr.type() == HASH;
-
- return IgniteDistributions.hash(joinInfo.leftKeys, rightDistr.destinationFunctionFactory());
- default:
- throw new UnsupportedOperationException();
- }
- }
-
- public static DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) {
- return RelMetadataQueryEx.wrap(mq).getDistributionTrait(rel);
+ public IgniteDistribution distribution(Values values, RelMetadataQuery mq) {
+ return IgniteDistributions.broadcast();
+ }
+
+ public IgniteDistribution distribution(Exchange exchange, RelMetadataQuery mq) {
+ return (IgniteDistribution) exchange.distribution;
+ }
+
+ public IgniteDistribution distribution(HepRelVertex rel, RelMetadataQuery mq) {
+ return _distribution(rel.getCurrentRel(), mq);
+ }
+
+ public static IgniteDistribution project(RelMetadataQuery mq, RelNode input, List<? extends RexNode> projects) {
+ return project(input.getRowType(), _distribution(input, mq), projects);
+ }
+
+ public static IgniteDistribution project(RelDataType inType, IgniteDistribution inDistr, List<? extends RexNode> projects) {
+ return inDistr.apply(Project.getPartialMapping(inType.getFieldCount(), projects));
+ }
+
+ public static IgniteDistribution filter(RelMetadataQuery mq, RelNode input, RexNode condition) {
+ return _distribution(input, mq);
+ }
+
+ public static IgniteDistribution join(RelMetadataQuery mq, RelNode left, RelNode right, JoinInfo joinInfo, JoinRelType joinType) {
+ return join(_distribution(left, mq), _distribution(right, mq), joinInfo, joinType);
+ }
+
+ public static IgniteDistribution join(IgniteDistribution left, IgniteDistribution right, JoinInfo joinInfo, JoinRelType joinType) {
+ return F.first(IgniteDistributions.suggestJoin(left, right, joinInfo, joinType)).out();
+ }
+
+ public static IgniteDistribution _distribution(RelNode rel, RelMetadataQuery mq) {
+ IgniteDistribution distr = rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+
+ if (distr.getType() != ANY)
+ return distr;
+
+ return (IgniteDistribution) mq.distribution(rel);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 503b424..c5f941e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.metadata.MetadataDef;
import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
@@ -31,7 +32,9 @@ import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
/**
@@ -58,7 +61,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
return fragmentInfo(rel.getInput(), mq);
}
- public FragmentInfo getFragmentInfo(BiRel rel, RelMetadataQuery mq) {
+ public FragmentInfo getFragmentInfo(Join rel, RelMetadataQuery mq) {
mq = RelMetadataQueryEx.wrap(mq);
FragmentInfo left = fragmentInfo(rel.getLeft(), mq);
@@ -84,24 +87,24 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
}
}
- private OptimisticPlanningException planningException(BiRel rel, Exception cause, boolean splitLeft) {
- String msg = "Failed to calculate physical distribution";
-
- if (splitLeft)
- return new OptimisticPlanningException(msg, new Edge(rel, rel.getLeft(), 0), cause);
-
- return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
- }
-
public FragmentInfo getFragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
return new FragmentInfo(Pair.of(rel, rel.source()));
}
public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
- return rel.fragmentInfo();
+ return rel.getTable().unwrap(IgniteTable.class).fragmentInfo(Commons.plannerContext(rel));
}
public static FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
return RelMetadataQueryEx.wrap(mq).getFragmentLocation(rel);
}
+
+ private OptimisticPlanningException planningException(BiRel rel, Exception cause, boolean splitLeft) {
+ String msg = "Failed to calculate physical distribution";
+
+ if (splitLeft)
+ return new OptimisticPlanningException(msg, new Edge(rel, rel.getLeft(), 0), cause);
+
+ return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 7fe6fbf..5579a14 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
@@ -25,7 +26,7 @@ import org.apache.calcite.rel.metadata.MetadataDef;
import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
/**
@@ -35,23 +36,11 @@ public class IgniteMetadata {
public static final RelMetadataProvider METADATA_PROVIDER =
ChainedRelMetadataProvider.of(
ImmutableList.of(
+ IgniteMdDerivedDistribution.SOURCE,
IgniteMdDistribution.SOURCE,
IgniteMdFragmentInfo.SOURCE,
DefaultRelMetadataProvider.INSTANCE));
- public interface DistributionTraitMetadata extends Metadata {
- MetadataDef<DistributionTraitMetadata> DEF = MetadataDef.of(DistributionTraitMetadata.class,
- DistributionTraitMetadata.Handler.class, IgniteMethod.DISTRIBUTION_TRAIT.method());
-
- /** Determines how the rows are distributed. */
- DistributionTrait getDistributionTrait();
-
- /** Handler API. */
- interface Handler extends MetadataHandler<DistributionTraitMetadata> {
- DistributionTrait getDistributionTrait(RelNode r, RelMetadataQuery mq);
- }
- }
-
public interface FragmentMetadata extends Metadata {
MetadataDef<FragmentMetadata> DEF = MetadataDef.of(FragmentMetadata.class,
FragmentMetadata.Handler.class, IgniteMethod.FRAGMENT_INFO.method());
@@ -64,4 +53,16 @@ public class IgniteMetadata {
FragmentInfo getFragmentInfo(RelNode r, RelMetadataQuery mq);
}
}
+
+ public interface DerivedDistribution extends Metadata {
+ MetadataDef<DerivedDistribution> DEF = MetadataDef.of(DerivedDistribution.class,
+ DerivedDistribution.Handler.class, IgniteMethod.DERIVED_DISTRIBUTIONS.method());
+
+ List<IgniteDistribution> deriveDistributions();
+
+ /** Handler API. */
+ interface Handler extends MetadataHandler<DerivedDistribution> {
+ List<IgniteDistribution> deriveDistributions(RelNode r, RelMetadataQuery mq);
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index fb609ea..f0d3fe3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -16,7 +16,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
/**
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index c93d04b..74d7682 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -16,7 +16,8 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.util.Arrays;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -27,7 +28,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.jetbrains.annotations.NotNull;
/**
@@ -35,22 +36,21 @@ import org.jetbrains.annotations.NotNull;
*/
public class RelMetadataQueryEx extends RelMetadataQuery {
private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx();
- private static final JaninoRelMetadataProvider PROVIDER = JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
+ public static final JaninoRelMetadataProvider PROVIDER = JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
static {
- PROVIDER.register(Arrays.asList(
- IgniteExchange.class,
- IgniteFilter.class,
- IgniteJoin.class,
- IgniteProject.class,
- IgniteTableScan.class,
- IgniteReceiver.class,
- IgniteSender.class
- ));
+ PROVIDER.register(ImmutableList.of(
+ IgniteExchange.class,
+ IgniteReceiver.class,
+ IgniteSender.class,
+ IgniteFilter.class,
+ IgniteProject.class,
+ IgniteJoin.class,
+ IgniteTableScan.class));
}
- private IgniteMetadata.DistributionTraitMetadata.Handler distributionTraitHandler;
private IgniteMetadata.FragmentMetadata.Handler sourceDistributionHandler;
+ private IgniteMetadata.DerivedDistribution.Handler derivedDistributionsHandler;
@SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
public static RelMetadataQueryEx instance() {
@@ -67,22 +67,22 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
private RelMetadataQueryEx(@NotNull RelMetadataQueryEx parent) {
super(PROVIDER, parent);
- distributionTraitHandler = parent.distributionTraitHandler;
sourceDistributionHandler = parent.sourceDistributionHandler;
+ derivedDistributionsHandler = parent.derivedDistributionsHandler;
}
private RelMetadataQueryEx(@NotNull RelMetadataQuery parent) {
super(PROVIDER, parent);
- distributionTraitHandler = PROTO.distributionTraitHandler;
sourceDistributionHandler = PROTO.sourceDistributionHandler;
+ derivedDistributionsHandler = PROTO.derivedDistributionsHandler;
}
private RelMetadataQueryEx() {
super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
- distributionTraitHandler = initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
sourceDistributionHandler = initialHandler(IgniteMetadata.FragmentMetadata.Handler.class);
+ derivedDistributionsHandler = initialHandler(IgniteMetadata.DerivedDistribution.Handler.class);
}
public FragmentInfo getFragmentLocation(RelNode rel) {
@@ -95,12 +95,12 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
}
}
- public DistributionTrait getDistributionTrait(RelNode rel) {
+ public List<IgniteDistribution> derivedDistributions(RelNode rel) {
for (;;) {
try {
- return distributionTraitHandler.getDistributionTrait(rel, this);
+ return derivedDistributionsHandler.deriveDistributions(rel, this);
} catch (JaninoRelMetadataProvider.NoHandler e) {
- distributionTraitHandler = revise(e.relClass, IgniteMetadata.DistributionTraitMetadata.DEF);
+ derivedDistributionsHandler = revise(e.relClass, IgniteMetadata.DerivedDistribution.DEF);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
index 192e686..b81961f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
@@ -35,9 +35,7 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
/**
@@ -84,11 +82,11 @@ public class DistributedExecution implements QueryExecution {
RelTraitSet desired = rel.getTraitSet()
.replace(relRoot.collation)
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(RelDistributions.ANY)
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
} catch (SqlParseException | ValidationException e) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index dec15d4..5c12ffa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -38,6 +38,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.plan.volcano.VolcanoUtils;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
@@ -46,7 +47,9 @@ import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
@@ -70,9 +73,8 @@ import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.GraphToRelConverter;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
@@ -159,12 +161,14 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
metadataProvider = null;
validator = null;
+ RelMetadataQuery.THREAD_PROVIDERS.remove();
+
open = false;
}
private void ready() {
if (!open) {
- planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), context);
+ planner = VolcanoUtils.impatient(new VolcanoPlanner(frameworkConfig.getCostFactory(), context));
planner.setExecutor(executor);
metadataProvider = new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner);
@@ -213,7 +217,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
public RelNode convert(RelGraph graph) {
ready();
- RelOptCluster cluster = RelOptCluster.create(planner, createRexBuilder());
+ RelOptCluster cluster = createCluster(createRexBuilder());
RelBuilder relBuilder = createRelBuilder(cluster, createCatalogReader());
return new GraphToRelConverter(this, relBuilder, operatorTable).convert(graph);
@@ -224,7 +228,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
ready();
RexBuilder rexBuilder = createRexBuilder();
- RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+ RelOptCluster cluster = createCluster(rexBuilder);
SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
.withConfig(sqlToRelConverterConfig)
.withTrimUnusedFields(false)
@@ -242,8 +246,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
public QueryPlan plan(RelNode rel) {
ready();
- if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
- throw new IllegalArgumentException("IGNITE_CONVENTION is required.");
+ if (rel.getConvention() != IgniteConvention.INSTANCE)
+ throw new IllegalArgumentException("Physical node is required.");
return new Splitter().go((IgniteRel) rel);
}
@@ -251,10 +255,10 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
public Graph graph(RelNode rel) {
ready();
- if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
- throw new IllegalArgumentException("IGNITE_CONVENTION is required.");
+ if (rel.getConvention() != IgniteConvention.INSTANCE)
+ throw new IllegalArgumentException("Physical node is required.");
- return new RelToGraphConverter().convert((IgniteRel) rel);
+ return new RelToGraphConverter().go((IgniteRel) rel);
}
/** {@inheritDoc} */
@@ -277,7 +281,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
validator.setIdentifierExpansion(true);
RexBuilder rexBuilder = createRexBuilder();
- RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+ RelOptCluster cluster = createCluster(rexBuilder);
SqlToRelConverter.Config config = SqlToRelConverter
.configBuilder()
.withConfig(sqlToRelConverterConfig)
@@ -294,6 +298,15 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
}
+ private RelOptCluster createCluster(RexBuilder rexBuilder) {
+ RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+
+ cluster.setMetadataProvider(metadataProvider);
+ RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(metadataProvider));
+
+ return cluster;
+ }
+
/** {@inheritDoc} */
@Override public RelNode transform(int programIdx, RelTraitSet targetTraits, RelNode rel) {
ready();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
index dc20880..352980f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
@@ -27,9 +27,6 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
import org.apache.ignite.internal.processors.query.calcite.exchange.ExchangeProcessor;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
/**
*
@@ -43,14 +40,13 @@ public final class PlannerContext implements Context {
private final GridKernalContext kernalContext;
private final CalciteQueryProcessor queryProcessor;
private final MappingService mappingService;
- private final TableDistributionService distributionService;
private final ExchangeProcessor exchangeProcessor;
private IgnitePlanner planner;
private PlannerContext(Context parentContext, Query query, AffinityTopologyVersion topologyVersion,
SchemaPlus schema, IgniteLogger logger, GridKernalContext kernalContext, CalciteQueryProcessor queryProcessor, MappingService mappingService,
- TableDistributionService distributionService, ExchangeProcessor exchangeProcessor) {
+ ExchangeProcessor exchangeProcessor) {
this.parentContext = parentContext;
this.query = query;
this.topologyVersion = topologyVersion;
@@ -59,7 +55,6 @@ public final class PlannerContext implements Context {
this.kernalContext = kernalContext;
this.queryProcessor = queryProcessor;
this.mappingService = mappingService;
- this.distributionService = distributionService;
this.exchangeProcessor = exchangeProcessor;
}
@@ -99,10 +94,6 @@ public final class PlannerContext implements Context {
return mappingService;
}
- public TableDistributionService distributionService() {
- return distributionService;
- }
-
public ExchangeProcessor exchangeProcessor() {
return exchangeProcessor;
}
@@ -125,10 +116,6 @@ public final class PlannerContext implements Context {
return mappingService.distributed(cacheId, topVer);
}
- public DistributionTrait distributionTrait(int cacheId, RowType rowType) {
- return distributionService.distribution(cacheId, rowType);
- }
-
public QueryProvider queryProvider() {
return null; // TODO
}
@@ -153,7 +140,6 @@ public final class PlannerContext implements Context {
private GridKernalContext kernalContext;
private CalciteQueryProcessor queryProcessor;
private MappingService mappingService;
- private TableDistributionService distributionService;
private ExchangeProcessor exchangeProcessor;
public Builder parentContext(Context parentContext) {
@@ -196,18 +182,13 @@ public final class PlannerContext implements Context {
return this;
}
- public Builder distributionService(TableDistributionService distributionService) {
- this.distributionService = distributionService;
- return this;
- }
-
public Builder exchangeProcessor(ExchangeProcessor exchangeProcessor) {
this.exchangeProcessor = exchangeProcessor;
return this;
}
public PlannerContext build() {
- return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, distributionService, exchangeProcessor);
+ return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, exchangeProcessor);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
similarity index 61%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index a243f52..906c20e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rule;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+import org.apache.calcite.rel.rules.SubQueryRemoveRule;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.RuleSets;
-import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverter;
+import org.apache.ignite.internal.processors.query.calcite.rule.JoinConverter;
+import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverter;
+import org.apache.ignite.internal.processors.query.calcite.rule.TableScanConverter;
/**
*
@@ -29,21 +32,21 @@ public enum PlannerPhase {
/** */
SUBQUERY_REWRITE("Sub-queries rewrites") {
@Override public RuleSet getRules(PlannerContext ctx) {
- return RuleSets.ofList(IgniteRules.SUBQUERY_REWRITE_RULES);
+ return RuleSets.ofList(
+ SubQueryRemoveRule.FILTER,
+ SubQueryRemoveRule.PROJECT,
+ SubQueryRemoveRule.JOIN);
}
},
/** */
- LOGICAL("Logical planning") {
+ OPTIMIZATION("Main optimization phase") {
@Override public RuleSet getRules(PlannerContext ctx) {
- return RuleSets.ofList(IgniteRules.logicalRules(ctx));
- }
- },
-
- /** */
- PHYSICAL("Execution tree building") {
- @Override public RuleSet getRules(PlannerContext ctx) {
- return RuleSets.ofList(Interpretable.RULES);
+ return RuleSets.ofList(
+ new TableScanConverter(),
+ new JoinConverter(),
+ new ProjectConverter(),
+ new FilterConverter());
}
};
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
similarity index 92%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
index a4a8db8..cddc4d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.rule;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
/**
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
new file mode 100644
index 0000000..c4fad9f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.AbstractConverter;
+
+/**
+ *
+ */
+public class IgniteConvention extends Convention.Impl {
+ public static final Convention INSTANCE = new IgniteConvention();
+
+ private IgniteConvention() {
+ super("IGNITE", IgniteRel.class);
+ }
+
+ @Override public void register(RelOptPlanner planner) {
+ planner.addRule(AbstractConverter.ExpandConversionRule.INSTANCE);
+ }
+
+ @Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return fromTraits.contains(INSTANCE) && toTraits.contains(INSTANCE);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 51be890..f932020 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -16,52 +16,25 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
-import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.calcite.rel.core.Exchange;
/**
*
*/
-public final class IgniteExchange extends SingleRel implements IgniteRel {
- /**
- * Creates a <code>SingleRel</code>.
- *
- * @param cluster Cluster this relational expression belongs to
- * @param traits Node traits.
- * @param input Input relational expression
- */
- public IgniteExchange(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
- super(cluster, traits, input);
+public class IgniteExchange extends Exchange implements IgniteRel {
+ public IgniteExchange(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDistribution distribution) {
+ super(cluster, traitSet, input, distribution);
}
- @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- double rowCount = mq.getRowCount(this);
- double bytesPerRow = getRowType().getFieldCount() * 4;
- return planner.getCostFactory().makeCost(
- Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
+ @Override public Exchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution) {
+ return new IgniteExchange(getCluster(), traitSet, newInput, newDistribution);
}
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new IgniteExchange(getCluster(), traitSet, sole(inputs));
- }
-
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
+ @Override public <T> T implement(Implementor<T> implementor) {
return implementor.implement(this);
}
-
- @Override public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw)
- .item("distribution", getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 5a96897..2a0594e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,66 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal.processors.query.calcite.rel;
-import com.google.common.collect.ImmutableSet;
-import java.util.Objects;
-import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteFilter extends Filter implements IgniteRel {
- private final Set<CorrelationId> variablesSet;
-
- public IgniteFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
- RexNode condition, Set<CorrelationId> variablesSet) {
- super(cluster, traitSet, child, condition);
- this.variablesSet = Objects.requireNonNull(variablesSet);
- }
-
- @Override public Set<CorrelationId> getVariablesSet() {
- return variablesSet;
- }
-
- @Override public IgniteFilter copy(RelTraitSet traitSet, RelNode input,
- RexNode condition) {
- return new IgniteFilter(getCluster(), traitSet, input, condition, variablesSet);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
- return implementor.implement(this);
- }
- @Override public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw)
- .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
- }
-
- public static IgniteFilter create(Filter filter, RelNode input) {
- RexNode condition = filter.getCondition();
- Set<CorrelationId> variablesSet = filter.getVariablesSet();
-
- return create(input, condition, variablesSet);
- }
-
- public static IgniteFilter create(RelNode input, RexNode condition, Set<CorrelationId> variablesSet) {
- RelOptCluster cluster = input.getCluster();
- RelMetadataQuery mq = cluster.getMetadataQuery();
-
- RelTraitSet traits = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.filter(mq, input, condition));
-
- return new IgniteFilter(cluster, traits, input, condition, ImmutableSet.copyOf(variablesSet));
- }
-}
\ No newline at end of file
+/**
+ *
+ */
+public class IgniteFilter extends Filter implements IgniteRel {
+ public IgniteFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) {
+ super(cluster, traits, input, condition);
+ }
+
+ @Override public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new IgniteFilter(getCluster(), traitSet, input, condition);
+ }
+
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
index e576897..344f4a2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,53 +13,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteJoin extends Join implements IgniteRel {
- private final boolean semiJoinDone;
-
- public IgniteJoin(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode left,
- RelNode right,
- RexNode condition,
- Set<CorrelationId> variablesSet,
- JoinRelType joinType,
- boolean semiJoinDone) {
- super(cluster, traitSet, left, right, condition, variablesSet, joinType);
- this.semiJoinDone = semiJoinDone;
- }
- @Override public IgniteJoin copy(RelTraitSet traitSet, RexNode conditionExpr,
- RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
- return new IgniteJoin(getCluster(), traitSet, left, right, conditionExpr, variablesSet, joinType, semiJoinDone);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
- return implementor.implement(this);
- }
+/**
+ *
+ */
+public class IgniteJoin extends Join implements IgniteRel {
+ public IgniteJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+ }
- @Override public RelWriter explainTerms(RelWriter pw) {
- // Don't ever print semiJoinDone=false. This way, we
- // don't clutter things up in optimizers that don't use semi-joins.
- return super.explainTerms(pw)
- .itemIf("semiJoinDone", semiJoinDone, semiJoinDone);
- }
+ @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+ return new IgniteJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
+ }
- @Override public boolean isSemiJoinDone() {
- return semiJoinDone;
- }
-}
\ No newline at end of file
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index fe0d55e..03fe6ca 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
@@ -21,55 +21,22 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteProject extends Project implements IgniteRel {
- public IgniteProject(
- RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode input,
- List<? extends RexNode> projects,
- RelDataType rowType) {
- super(cluster, traitSet, input, projects, rowType);
- }
-
- @Override public IgniteProject copy(RelTraitSet traitSet, RelNode input,
- List<RexNode> projects, RelDataType rowType) {
- return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
- }
-
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
- return implementor.implement(this);
- }
- /** Creates a LogicalProject. */
- public static IgniteProject create(final RelNode input, List<? extends RexNode> projects, List<String> fieldNames) {
- final RelOptCluster cluster = input.getCluster();
- final RelDataType rowType =
- RexUtil.createStructType(cluster.getTypeFactory(), projects,
- fieldNames, SqlValidatorUtil.F_SUGGESTER);
- return create(input, projects, rowType);
- }
-
- public static IgniteProject create(Project project, RelNode input) {
- return create(input, project.getProjects(), project.getRowType());
- }
-
- public static IgniteProject create(RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
- RelOptCluster cluster = input.getCluster();
- RelMetadataQuery mq = cluster.getMetadataQuery();
- RelTraitSet traits = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.project(mq, input, projects));
-
- return new IgniteProject(cluster, traits, input, projects, rowType);
- }
+/**
+ *
+ */
+public class IgniteProject extends Project implements IgniteRel {
+ public IgniteProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+ return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 98cf908..60d5477 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -16,41 +16,46 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
+import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
*
*/
-public final class IgniteReceiver extends AbstractRelNode implements IgniteRel {
- private final Source source;
-
- /**
- * @param cluster Cluster this relational expression belongs to
- * @param traits Trait set.
- */
- public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Source source) {
+public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
+ private RelSource source;
+
+ public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, RelSource source) {
super(cluster, traits);
+
this.rowType = rowType;
this.source = source;
}
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
- return implementor.implement(this);
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new IgniteReceiver(getCluster(), traitSet, rowType, source);
}
- public DistributionTrait distribution() {
- return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
}
- public Source source() {
+ public RelSource source() {
return source;
}
+
+ public void source(RelSource source) {
+ this.source = source;
+ }
+
+ public IgniteDistribution distribution() {
+ return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index dbfbb3f..b5d876f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,22 +16,11 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
/**
*
*/
public interface IgniteRel extends RelNode {
- Convention IGNITE_CONVENTION = new Convention.Impl("IGNITE_LOGICAL", IgniteRel.class) {
- /** */
- @Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
- return fromTraits.containsIfApplicable(IGNITE_CONVENTION)
- && toTraits.containsIfApplicable(IGNITE_CONVENTION); // Enables trait definition conversion
- }
- };
-
- <T> T implement(RelImplementor<T> implementor);
+ <T> T implement(Implementor<T> implementor);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
similarity index 78%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
index 9734885..7d3efc9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
@@ -14,17 +14,10 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.rel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
/**
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index e89712a..e50b7bf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -21,59 +21,37 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
/**
*
*/
-public final class IgniteSender extends SingleRel implements IgniteRel {
- private Target target;
+public class IgniteSender extends SingleRel implements IgniteRel {
+ private RelTarget target;
- /**
- * Creates a <code>SingleRel</code>.
- * @param cluster Cluster this relational expression belongs to
- * @param traits Trait set.
- * @param input Input relational expression
- */
- public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelTarget target) {
super(cluster, traits, input);
+
+ this.target = target;
}
- private IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input, Target target) {
+ public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
super(cluster, traits, input);
-
- this.target = target;
}
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new IgniteSender(getCluster(), traitSet, sole(inputs), target);
}
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
+ @Override public <T> T implement(Implementor<T> implementor) {
return implementor.implement(this);
}
- public void init(Target target) {
- this.target = target;
- }
-
- public Target target() {
+ public RelTarget target() {
return target;
}
- public static IgniteSender create(RelNode input, Target target) {
- RelOptCluster cluster = input.getCluster();
- RelMetadataQuery mq = cluster.getMetadataQuery();
-
- RelTraitSet traits = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.distribution(input, mq));
-
- return new IgniteSender(cluster, traits, input, target);
+ public void target(RelTarget target) {
+ this.target = target;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index c529aee..5f01899 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
@@ -22,29 +22,20 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteTableScan extends TableScan implements IgniteRel {
- public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
- super(cluster, traitSet, table);
- }
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- assert inputs.isEmpty();
-
- return this;
- }
+/**
+ *
+ */
+public class IgniteTableScan extends TableScan implements IgniteRel {
+ public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+ super(cluster, traitSet, table);
+ }
- /** {@inheritDoc} */
- @Override public <T> T implement(RelImplementor<T> implementor) {
- return implementor.implement(this);
- }
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return this;
+ }
- public FragmentInfo fragmentInfo() {
- return getTable().unwrap(IgniteTable.class)
- .fragmentInfo(Commons.plannerContext(getCluster().getPlanner().getContext()));
- }
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
similarity index 58%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
index c67964f..50da38c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
@@ -14,32 +14,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.rel;
/**
*
*/
-public enum DistributionType {
- HASH("hash"),
- RANDOM("random"),
- BROADCAST("broadcast"),
- SINGLE("single"),
- ANY("any");
-
- /** */
- private final String description;
-
- /**
- *
- */
- DistributionType(String description) {
- this.description = description;
- }
+public interface Implementor<T> extends RelOp<IgniteRel, T> {
+ T implement(IgniteSender rel);
+
+ T implement(IgniteFilter rel);
+
+ T implement(IgniteProject rel);
+
+ T implement(IgniteJoin rel);
+
+ T implement(IgniteTableScan rel);
+
+ T implement(IgniteReceiver rel);
+
+ T implement(IgniteExchange rel);
+
+ T implement(IgniteRel other);
- /**
- *
- */
- @Override public String toString() {
- return description;
+ @Override default T go(IgniteRel rel) {
+ return rel.implement(this);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelOp.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/RelOp.java
similarity index 93%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelOp.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/RelOp.java
index 6bc90bf..fb073c9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelOp.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/RelOp.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.rel;
import org.apache.calcite.rel.RelNode;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractVariableConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractVariableConverter.java
new file mode 100644
index 0000000..02c7d53
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractVariableConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public abstract class AbstractVariableConverter extends ConverterRule {
+ protected AbstractVariableConverter(Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix) {
+ super(clazz, in, out, descriptionPrefix);
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ RelNode rel = call.rel(0);
+ if (rel.getTraitSet().contains(getInTrait())) {
+ for (RelNode newRel : convert(rel, false))
+ call.transformTo(newRel);
+ }
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ return F.first(convert(rel, true));
+ }
+
+ public abstract List<RelNode> convert(RelNode rel, boolean firstOnly);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterConverter.java
new file mode 100644
index 0000000..6a6ce47
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class FilterConverter extends AbstractVariableConverter {
+ public FilterConverter() {
+ super(LogicalFilter.class, Convention.NONE, IgniteConvention.INSTANCE, "FilterConverter");
+ }
+
+ @Override public List<RelNode> convert(RelNode rel, boolean firstOnly) {
+ LogicalFilter filter = (LogicalFilter) rel;
+
+ RelNode input = convert(filter.getInput(), IgniteConvention.INSTANCE);
+
+ RelOptCluster cluster = rel.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ List<IgniteDistribution> distrs = IgniteMdDerivedDistribution.deriveDistributions(input, IgniteConvention.INSTANCE, mq);
+
+ return firstOnly ? F.asList(create(filter, input, F.first(distrs))) :
+ Commons.transform(distrs, d -> create(filter, input, d));
+ }
+
+ private static IgniteFilter create(LogicalFilter filter, RelNode input, IgniteDistribution distr) {
+ RelTraitSet traits = filter.getTraitSet()
+ .replace(distr)
+ .replace(IgniteConvention.INSTANCE);
+
+ return new IgniteFilter(filter.getCluster(), traits, convert(input, distr), filter.getCondition());
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
deleted file mode 100644
index 4d82c98..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelOp;
-
-/**
- *
- */
-public class IgniteFilterRule extends RelOptRule {
- public static final RelOptRule INSTANCE = new IgniteFilterRule();
-
- private IgniteFilterRule() {
- super(Commons.any(LogicalFilter.class, RelNode.class), "IgniteFilterRule");
- }
-
- @Override public void onMatch(RelOptRuleCall call) {
- LogicalFilter filter = call.rel(0);
- RelOptCluster cluster = filter.getCluster();
- RelNode input = filter.getInput();
-
- final RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION);
-
- RelNode converted = convert(input, traitSet);
-
- RelOp<LogicalFilter, Boolean> transformOp = Commons.transformSubset(call, converted, IgniteFilter::create);
-
- if (!transformOp.go(filter))
- call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(converted)));
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
deleted file mode 100644
index 3f380d3..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Objects;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE;
-
-/**
- *
- */
-public class IgniteJoinRule extends RelOptRule {
- public static final RelOptRule INSTANCE = new IgniteJoinRule();
-
- public IgniteJoinRule() {
- super(Commons.any(Join.class, RelNode.class), RelFactories.LOGICAL_BUILDER, "IgniteJoinRule");
- }
-
- @Override public void onMatch(RelOptRuleCall call) {
- Join join = call.rel(0);
-
- RelOptCluster cluster = join.getCluster();
-
- RelTraitSet traitSet = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION);
-
- RelNode left = convert(join.getLeft(), traitSet);
- RelNode right = convert(join.getRight(), traitSet);
-
- RelMetadataQuery mq = call.getMetadataQuery();
-
- List<DistributionTrait> leftDerived;
- List<DistributionTrait> rightDerived;
-
- if ((leftDerived = IgniteDistributions.deriveDistributions(left, mq)).isEmpty()
- || (rightDerived = IgniteDistributions.deriveDistributions(right, mq)).isEmpty()) {
- call.transformTo(join.copy(join.getTraitSet(), ImmutableList.of(left, right)));
-
- return;
- }
-
- List<DistributionTrait> leftDists = Commons.concat(leftDerived,
- IgniteDistributions.hash(join.analyzeCondition().leftKeys));
-
- List<DistributionTrait> rightDists = Commons.concat(rightDerived,
- IgniteDistributions.hash(join.analyzeCondition().rightKeys));
-
- for (DistributionTrait leftDist0 : leftDists) {
- for (DistributionTrait rightDist0 : rightDists) {
- if (canTransform(join, leftDist0, rightDist0))
- transform(call, join, mq, leftDist0, rightDist0);
- }
- }
- }
-
- private void transform(RelOptRuleCall call, Join join, RelMetadataQuery mq, DistributionTrait leftDist, DistributionTrait rightDist) {
- RelOptCluster cluster = join.getCluster();
-
- RelTraitSet leftTraits = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replace(leftDist);
-
- RelTraitSet rightTraits = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replace(rightDist);
-
- RelNode left = convert(join.getLeft(), leftTraits);
- RelNode right = convert(join.getRight(), rightTraits);
-
- RelTraitSet traitSet = cluster.traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.join(mq, left, right, join.analyzeCondition(), join.getJoinType()));
-
- call.transformTo(new IgniteJoin(cluster, traitSet, left, right,
- join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone()));
- }
-
- private boolean canTransform(Join join, DistributionTrait leftDist, DistributionTrait rightDist) {
- if (leftDist.type() == BROADCAST
- && rightDist.type() == BROADCAST)
- return true;
-
- if (rightDist.type() == SINGLE
- && leftDist.type() == SINGLE)
- return true;
-
- if (leftDist.type() == BROADCAST
- && rightDist.type() == HASH
- && Objects.equals(rightDist.keys(), join.analyzeCondition().rightKeys))
- return true;
-
- if (rightDist.type() == BROADCAST
- && leftDist.type() == HASH
- && Objects.equals(leftDist.keys(), join.analyzeCondition().leftKeys))
- return true;
-
- if (leftDist.type() == HASH
- && rightDist.type() == HASH
- && Objects.equals(leftDist.keys(), join.analyzeCondition().leftKeys)
- && Objects.equals(rightDist.keys(), join.analyzeCondition().rightKeys)
- && Objects.equals(rightDist.destinationFunctionFactory(), leftDist.destinationFunctionFactory()))
- return true;
-
- return false;
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
deleted file mode 100644
index f238d08..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelOp;
-
-/**
- *
- */
-public class IgniteProjectRule extends RelOptRule {
- public static final RelOptRule INSTANCE = new IgniteProjectRule();
-
- private <R extends RelNode> IgniteProjectRule() {
- super(Commons.any(LogicalProject.class, RelNode.class), RelFactories.LOGICAL_BUILDER, "IgniteProjectRule");
- }
-
- @Override public void onMatch(RelOptRuleCall call) {
- LogicalProject project = call.rel(0);
- RelOptCluster cluster = project.getCluster();
- RelNode input = project.getInput();
-
- final RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION);
-
- RelNode converted = convert(input, traitSet);
-
- RelOp<LogicalProject, Boolean> transformOp = Commons.transformSubset(call, converted, IgniteProject::create);
-
- if (!transformOp.go(project))
- call.transformTo(project.copy(project.getTraitSet(), ImmutableList.of(converted)));
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
deleted file mode 100644
index cf514d5..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.volcano.AbstractConverter;
-import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
-import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
-import org.apache.calcite.rel.rules.AggregateMergeRule;
-import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
-import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
-import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
-import org.apache.calcite.rel.rules.AggregateRemoveRule;
-import org.apache.calcite.rel.rules.AggregateStarTableRule;
-import org.apache.calcite.rel.rules.AggregateValuesRule;
-import org.apache.calcite.rel.rules.CalcRemoveRule;
-import org.apache.calcite.rel.rules.DateRangeRules;
-import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
-import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
-import org.apache.calcite.rel.rules.FilterJoinRule;
-import org.apache.calcite.rel.rules.FilterMergeRule;
-import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
-import org.apache.calcite.rel.rules.FilterTableScanRule;
-import org.apache.calcite.rel.rules.IntersectToDistinctRule;
-import org.apache.calcite.rel.rules.JoinCommuteRule;
-import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
-import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
-import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
-import org.apache.calcite.rel.rules.ProjectMergeRule;
-import org.apache.calcite.rel.rules.ProjectRemoveRule;
-import org.apache.calcite.rel.rules.ProjectToWindowRule;
-import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
-import org.apache.calcite.rel.rules.ReduceExpressionsRule;
-import org.apache.calcite.rel.rules.SemiJoinRule;
-import org.apache.calcite.rel.rules.SortJoinTransposeRule;
-import org.apache.calcite.rel.rules.SortProjectTransposeRule;
-import org.apache.calcite.rel.rules.SortRemoveConstantKeysRule;
-import org.apache.calcite.rel.rules.SortRemoveRule;
-import org.apache.calcite.rel.rules.SortUnionTransposeRule;
-import org.apache.calcite.rel.rules.SubQueryRemoveRule;
-import org.apache.calcite.rel.rules.TableScanRule;
-import org.apache.calcite.rel.rules.UnionMergeRule;
-import org.apache.calcite.rel.rules.UnionPullUpConstantsRule;
-import org.apache.calcite.rel.rules.UnionToDistinctRule;
-import org.apache.calcite.rel.rules.ValuesReduceRule;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-
-/**
- *
- */
-public class IgniteRules {
- public static final List<RelOptRule> BASE_RULES = ImmutableList.of(
- AggregateStarTableRule.INSTANCE,
- AggregateStarTableRule.INSTANCE2,
- TableScanRule.INSTANCE,
- ProjectMergeRule.INSTANCE,
- FilterTableScanRule.INSTANCE,
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- FilterJoinRule.FILTER_ON_JOIN,
- JoinPushExpressionsRule.INSTANCE,
- AggregateExpandDistinctAggregatesRule.INSTANCE,
- AggregateReduceFunctionsRule.INSTANCE,
- FilterAggregateTransposeRule.INSTANCE,
- ProjectWindowTransposeRule.INSTANCE,
- JoinCommuteRule.INSTANCE,
- JoinPushThroughJoinRule.RIGHT,
- JoinPushThroughJoinRule.LEFT,
- SortProjectTransposeRule.INSTANCE,
- SortJoinTransposeRule.INSTANCE,
- SortRemoveConstantKeysRule.INSTANCE,
- SortUnionTransposeRule.INSTANCE,
- ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE,
- ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE);
-
- public static final List<RelOptRule> ABSTRACT_RULES = ImmutableList.of(
- AggregateProjectPullUpConstantsRule.INSTANCE2,
- UnionPullUpConstantsRule.INSTANCE,
- PruneEmptyRules.UNION_INSTANCE,
- PruneEmptyRules.INTERSECT_INSTANCE,
- PruneEmptyRules.MINUS_INSTANCE,
- PruneEmptyRules.PROJECT_INSTANCE,
- PruneEmptyRules.FILTER_INSTANCE,
- PruneEmptyRules.SORT_INSTANCE,
- PruneEmptyRules.AGGREGATE_INSTANCE,
- PruneEmptyRules.JOIN_LEFT_INSTANCE,
- PruneEmptyRules.JOIN_RIGHT_INSTANCE,
- PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
- UnionMergeRule.INSTANCE,
- UnionMergeRule.INTERSECT_INSTANCE,
- UnionMergeRule.MINUS_INSTANCE,
- ProjectToWindowRule.PROJECT,
- FilterMergeRule.INSTANCE,
- DateRangeRules.FILTER_INSTANCE,
- IntersectToDistinctRule.INSTANCE);
-
- public static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES = ImmutableList.of(
- FilterJoinRule.FILTER_ON_JOIN,
- FilterJoinRule.JOIN,
- AbstractConverter.ExpandConversionRule.INSTANCE,
- JoinCommuteRule.INSTANCE,
- SemiJoinRule.PROJECT,
- SemiJoinRule.JOIN,
- AggregateRemoveRule.INSTANCE,
- UnionToDistinctRule.INSTANCE,
- ProjectRemoveRule.INSTANCE,
- AggregateJoinTransposeRule.INSTANCE,
- AggregateMergeRule.INSTANCE,
- AggregateProjectMergeRule.INSTANCE,
- CalcRemoveRule.INSTANCE,
- SortRemoveRule.INSTANCE);
-
- public static final List<RelOptRule> CONSTANT_REDUCTION_RULES = ImmutableList.of(
- ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.FILTER_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE,
- ReduceExpressionsRule.WINDOW_INSTANCE,
- ReduceExpressionsRule.JOIN_INSTANCE,
- ValuesReduceRule.FILTER_INSTANCE,
- ValuesReduceRule.PROJECT_FILTER_INSTANCE,
- ValuesReduceRule.PROJECT_INSTANCE,
- AggregateValuesRule.INSTANCE);
-
- public static final List<RelOptRule> SUBQUERY_REWRITE_RULES = ImmutableList.of(
- SubQueryRemoveRule.FILTER,
- SubQueryRemoveRule.PROJECT,
- SubQueryRemoveRule.JOIN);
-
- public static final List<RelOptRule> IGNITE_RULES = ImmutableList.of(
- IgniteFilterRule.INSTANCE,
- IgniteProjectRule.INSTANCE,
- IgniteJoinRule.INSTANCE);
-
- public static List<RelOptRule> logicalRules(PlannerContext ctx) {
- return ImmutableList.<RelOptRule>builder()
- .addAll(BASE_RULES)
- .addAll(ABSTRACT_RULES)
- .addAll(ABSTRACT_RELATIONAL_RULES)
- .addAll(IGNITE_RULES)
- .build();
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
new file mode 100644
index 0000000..00a2bd9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class JoinConverter extends AbstractVariableConverter {
+ public JoinConverter() {
+ super(LogicalJoin.class, Convention.NONE, IgniteConvention.INSTANCE, "JoinConverter");
+ }
+
+ @Override public List<RelNode> convert(RelNode rel, boolean firstOnly) {
+ LogicalJoin join = (LogicalJoin) rel;
+
+ RelNode left = convert(join.getLeft(), IgniteConvention.INSTANCE);
+ RelNode right = convert(join.getRight(), IgniteConvention.INSTANCE);
+
+ RelOptCluster cluster = join.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ List<IgniteDistribution> leftTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
+ List<IgniteDistribution> rightTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
+
+ List<IgniteDistributions.BiSuggestion> suggestions = IgniteDistributions.suggestJoin(leftTraits, rightTraits, join.analyzeCondition(), join.getJoinType());
+
+ return firstOnly ? F.asList(create(join, left, right, F.first(suggestions))) :
+ Commons.transform(suggestions, s -> create(join, left, right, s));
+ }
+
+ private static RelNode create(LogicalJoin join, RelNode left, RelNode right, IgniteDistributions.BiSuggestion suggest) {
+ left = convert(left, suggest.left());
+ right = convert(right, suggest.right());
+
+ RelTraitSet traitSet = join.getTraitSet().replace(IgniteConvention.INSTANCE).replace(suggest.out());
+
+ return new IgniteJoin(join.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType());
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverter.java
new file mode 100644
index 0000000..da281a0
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverter.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class ProjectConverter extends AbstractVariableConverter {
+ public ProjectConverter() {
+ super(LogicalProject.class, Convention.NONE, IgniteConvention.INSTANCE, "ProjectConverter");
+ }
+
+ @Override public List<RelNode> convert(RelNode rel, boolean firstOnly) {
+ LogicalProject project = (LogicalProject) rel;
+
+ RelNode input = convert(project.getInput(), IgniteConvention.INSTANCE);
+
+ RelOptCluster cluster = rel.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ List<IgniteDistribution> distrs = IgniteMdDerivedDistribution.deriveDistributions(input, IgniteConvention.INSTANCE, mq);
+
+ return firstOnly ? F.asList(create(project, input, F.first(distrs))) :
+ Commons.transform(distrs, d -> create(project, input, d));
+ }
+
+ private static IgniteProject create(LogicalProject project, RelNode input, IgniteDistribution distr) {
+ RelTraitSet traits = project.getTraitSet()
+ .replace(IgniteMdDistribution.project(input.getRowType(), distr, project.getProjects()))
+ .replace(IgniteConvention.INSTANCE);
+
+ return new IgniteProject(project.getCluster(), traits, convert(input, distr), project.getProjects(), project.getRowType());
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableScanConverter.java
similarity index 50%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableScanConverter.java
index f9e4a8d..7665a78 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableScanConverter.java
@@ -14,27 +14,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
+package org.apache.ignite.internal.processors.query.calcite.rule;
-import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
/**
*
*/
-public class TableScanNode extends RelGraphNode {
- private final List<String> tableName;
-
- private TableScanNode(List<String> tableName) {
- this.tableName = tableName;
+public class TableScanConverter extends ConverterRule {
+ public TableScanConverter() {
+ super(LogicalTableScan.class, Convention.NONE, IgniteConvention.INSTANCE, "TableScanConverter");
}
- public static TableScanNode create(IgniteTableScan rel) {
- return new TableScanNode(rel.getTable().getQualifiedName());
- }
+ @Override public RelNode convert(RelNode rel) {
+ LogicalTableScan scan = (LogicalTableScan) rel;
- @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
- return ctx.getSchema().getTableForMember(tableName).toRel(ctx);
+ RelTraitSet traitSet = scan.getTraitSet().replace(IgniteConvention.INSTANCE);
+ return new IgniteTableScan(rel.getCluster(), traitSet, scan.getTable());
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
index 18c8e5e..271c5f6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -55,7 +56,10 @@ public class IgniteSchema extends AbstractSchema {
* @param cacheInfo Cache info.
*/
public void onSqlTypeCreate(GridQueryTypeDescriptor typeDesc, GridCacheContextInfo cacheInfo) {
- addTable(new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowType(typeDesc)));
+ Object identityKey = cacheInfo.config().getCacheMode() == CacheMode.PARTITIONED ?
+ cacheInfo.cacheContext().group().affinity().similarAffinityKey() : null;
+
+ addTable(new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowType(typeDesc), identityKey));
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 14b8191..56e9302 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -17,25 +17,33 @@
package org.apache.ignite.internal.processors.query.calcite.schema;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.AffinityFactory;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.internal.CU;
/** */
@@ -43,11 +51,13 @@ public class IgniteTable extends AbstractTable implements TranslatableTable, Sca
private final String tableName;
private final String cacheName;
private final RowType rowType;
+ private final Object identityKey;
- public IgniteTable(String tableName, String cacheName, RowType rowType) {
+ public IgniteTable(String tableName, String cacheName, RowType rowType, Object identityKey) {
this.tableName = tableName;
this.cacheName = cacheName;
this.rowType = rowType;
+ this.identityKey = identityKey;
}
/**
@@ -69,17 +79,30 @@ public class IgniteTable extends AbstractTable implements TranslatableTable, Sca
return rowType.asRelDataType(typeFactory);
}
+ @Override public Statistic getStatistic() {
+ return new TableStatistics();
+ }
+
/** {@inheritDoc} */
@Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
RelOptCluster cluster = context.getCluster();
- PlannerContext ctx = Commons.plannerContext(cluster.getPlanner().getContext());
- RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(ctx));
- return new IgniteTableScan(cluster, traitSet, relOptTable);
+ RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
+ .replaceIf(DistributionTraitDef.INSTANCE, this::getDistribution);
+
+ return new LogicalTableScan(cluster, traitSet, relOptTable);
+ }
+
+ public IgniteDistribution getDistribution() {
+ Object key = identityKey();
+
+ if (key == null)
+ return IgniteDistributions.broadcast();
+
+ return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(CU.cacheId(cacheName), key));
}
- public DistributionTrait distributionTrait(PlannerContext context) {
- return context.distributionTrait(CU.cacheId(cacheName), rowType);
+ protected Object identityKey() {
+ return identityKey;
}
public FragmentInfo fragmentInfo(PlannerContext ctx) {
@@ -89,4 +112,26 @@ public class IgniteTable extends AbstractTable implements TranslatableTable, Sca
@Override public Enumerable<Object[]> scan(DataContext root) {
throw new AssertionError(); // TODO
}
+
+ private class TableStatistics implements Statistic {
+ @Override public Double getRowCount() {
+ return null;
+ }
+
+ @Override public boolean isKey(ImmutableBitSet columns) {
+ return false;
+ }
+
+ @Override public List<RelReferentialConstraint> getReferentialConstraints() {
+ return ImmutableList.of();
+ }
+
+ @Override public List<RelCollation> getCollations() {
+ return ImmutableList.of();
+ }
+
+ @Override public RelDistribution getDistribution() {
+ return IgniteTable.this.getDistribution();
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java
index 19ccfc2..0d8399e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java
@@ -16,10 +16,10 @@
package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.ExpToRexTranslator;
/**
@@ -30,7 +30,7 @@ public interface ConversionContext extends RelOptTable.ToRelContext {
RelOptSchema getSchema();
- Context getContext();
+ PlannerContext getContext();
ExpToRexTranslator getExpressionTranslator();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java
index 82041d6..1fd3da2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java
@@ -16,37 +16,43 @@
package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
public class FilterNode extends RelGraphNode {
- private final int[] variables;
private final Expression condition;
- private FilterNode(Expression condition, int[] variables) {
- this.variables = variables;
+ private FilterNode(Expression condition) {
this.condition = condition;
}
public static FilterNode create(IgniteFilter rel, RexToExpTranslator expTranslator) {
- return new FilterNode(expTranslator.translate(rel.getCondition()),
- rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray());
+ return new FilterNode(expTranslator.translate(rel.getCondition()));
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
- return IgniteFilter.create(
- F.first(children),
- condition.implement(ctx.getExpressionTranslator()),
- Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()));
+ RelNode input = F.first(children);
+ RexNode condition = this.condition.implement(ctx.getExpressionTranslator());
+ RelOptCluster cluster = input.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.filter(mq, input, condition));
+
+ return new IgniteFilter(cluster, traits, input, condition);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java
index 5de192a..f7574bf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
@@ -29,7 +28,9 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.tools.RelBuilder;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.ExpToRexTranslator;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -58,8 +59,8 @@ public class GraphToRelConverter implements ConversionContext {
return relBuilder.getRelOptSchema();
}
- @Override public Context getContext() {
- return getCluster().getPlanner().getContext();
+ @Override public PlannerContext getContext() {
+ return Commons.plannerContext(getCluster().getPlanner().getContext());
}
@Override public ExpToRexTranslator getExpressionTranslator() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java
index 587fb35..c26d2c7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java
@@ -34,22 +34,19 @@ public class JoinNode extends RelGraphNode {
private final Expression condition;
private final int[] variables;
private final JoinRelType joinType;
- private final boolean semiDone;
- private JoinNode(RelTraitSet traits, Expression condition, int[] variables, JoinRelType joinType, boolean semiDone) {
+ private JoinNode(RelTraitSet traits, Expression condition, int[] variables, JoinRelType joinType) {
super(traits);
this.condition = condition;
this.variables = variables;
this.joinType = joinType;
- this.semiDone = semiDone;
}
public static JoinNode create(IgniteJoin rel, RexToExpTranslator expTranslator) {
return new JoinNode(rel.getTraitSet(),
expTranslator.translate(rel.getCondition()),
rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray(),
- rel.getJoinType(),
- rel.isSemiJoinDone());
+ rel.getJoinType());
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
@@ -64,7 +61,6 @@ public class JoinNode extends RelGraphNode {
right,
ctx.getExpressionTranslator().translate(condition),
Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()),
- joinType,
- semiDone);
+ joinType);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java
index 7a157df..56c60df 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java
@@ -17,11 +17,18 @@
package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -42,8 +49,14 @@ public class ProjectNode extends RelGraphNode {
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
- return IgniteProject.create(F.first(children),
- ctx.getExpressionTranslator().translate(projects),
- dataType.toRelDataType(ctx.getTypeFactory()));
+ RelNode input = F.first(children);
+ List<RexNode> projects = ctx.getExpressionTranslator().translate(this.projects);
+ RelOptCluster cluster = input.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.project(mq, input, projects));
+
+ return new IgniteProject(cluster, traits, input, projects, dataType.toRelDataType(ctx.getTypeFactory()));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index 24534ec..96bc0dd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -21,8 +21,8 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
-import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSourceImpl;
/**
@@ -30,16 +30,16 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
*/
public class ReceiverNode extends RelGraphNode {
private final DataType dataType;
- private final Source source;
+ private final RelSource source;
- private ReceiverNode(RelTraitSet traits, DataType dataType, Source source) {
+ private ReceiverNode(RelTraitSet traits, DataType dataType, RelSource source) {
super(traits);
this.dataType = dataType;
this.source = source;
}
public static ReceiverNode create(IgniteReceiver rel) {
- Source source = new SourceImpl(rel.source().exchangeId(), rel.source().mapping());
+ RelSource source = new RelSourceImpl(rel.source().exchangeId(), rel.source().mapping());
return new ReceiverNode(rel.getTraitSet(), DataType.fromType(rel.getRowType()), source);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
index 6e534d2..d935255 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
@@ -28,15 +28,15 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-public class RelToGraphConverter {
+public class RelToGraphConverter implements RelOp<IgniteRel, RelGraph> {
private final RexToExpTranslator rexTranslator = new RexToExpTranslator();
private RelGraph graph;
@@ -52,7 +52,7 @@ public class RelToGraphConverter {
}
}
- private final class Implementor implements RelImplementor<Item> {
+ private final class Implementor implements org.apache.ignite.internal.processors.query.calcite.rel.Implementor<Item> {
@Override public Item implement(IgniteFilter rel) {
return new Item(graph.addNode(curParent, FilterNode.create(rel, rexTranslator)), Commons.cast(rel.getInputs()));
}
@@ -86,7 +86,7 @@ public class RelToGraphConverter {
}
}
- public RelGraph convert(IgniteRel root) {
+ @Override public RelGraph go(IgniteRel root) {
graph = new RelGraph();
Implementor implementor = new Implementor();
@@ -99,7 +99,7 @@ public class RelToGraphConverter {
curParent = item.parentId;
for (IgniteRel child : item.children) {
- stack.push(child.implement(implementor));
+ stack.push(implementor.go(child));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index 5164eed..2940015 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -17,18 +17,24 @@
package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
public class SenderNode extends RelGraphNode {
- private final Target target;
+ private final RelTarget target;
- private SenderNode(Target target) {
+ private SenderNode(RelTarget target) {
this.target = target;
}
@@ -37,6 +43,14 @@ public class SenderNode extends RelGraphNode {
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
- return IgniteSender.create(F.first(children), target);
+ RelNode input = F.first(children);
+ RelOptCluster cluster = input.getCluster();
+ RelMetadataQuery mq = cluster.getMetadataQuery();
+
+ RelTraitSet traits = cluster.traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution._distribution(input, mq));
+
+ return new IgniteSender(cluster, traits, input, target);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java
index b771bfe..94efe86 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java
@@ -17,49 +17,39 @@
package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
*/
public class SerializedTraits implements Serializable {
- private static final Byte IGNITE_CONVENTION = 0;
+ private static final Byte CONVENTION = 0;
private final List<Serializable> traits;
public SerializedTraits(RelTraitSet traits) {
- this.traits = translate(traits);
+ this.traits = Commons.transform(traits, this::toSerializable);
}
public RelTraitSet toTraitSet(RelOptCluster cluster) {
RelTraitSet traits = cluster.traitSet();
- for (Serializable trait : this.traits) {
+ for (Serializable trait : this.traits)
traits.replace(fromSerializable(trait));
- }
return traits.simplify();
}
- private List<Serializable> translate(List<RelTrait> traits) {
- ArrayList<Serializable> res = new ArrayList<>(traits.size());
- for (RelTrait trait : traits) {
- res.add(toSerializable(trait));
- }
-
- return res;
- }
-
private Serializable toSerializable(RelTrait trait) {
if (trait instanceof Serializable)
return (Serializable) trait;
- if (trait == IgniteRel.IGNITE_CONVENTION)
- return IGNITE_CONVENTION;
+ if (trait == IgniteConvention.INSTANCE)
+ return CONVENTION;
throw new AssertionError();
}
@@ -67,8 +57,8 @@ public class SerializedTraits implements Serializable {
private RelTrait fromSerializable(Serializable trait) {
if (trait instanceof RelTrait)
return (RelTrait) trait;
- if (IGNITE_CONVENTION.equals(trait))
- return IgniteRel.IGNITE_CONVENTION;
+ if (CONVENTION.equals(trait))
+ return IgniteConvention.INSTANCE;
throw new AssertionError();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
index f9e4a8d..097a8fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
import java.util.List;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -26,15 +27,18 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
public class TableScanNode extends RelGraphNode {
private final List<String> tableName;
- private TableScanNode(List<String> tableName) {
+ private TableScanNode(RelTraitSet traits, List<String> tableName) {
+ super(traits);
this.tableName = tableName;
}
public static TableScanNode create(IgniteTableScan rel) {
- return new TableScanNode(rel.getTable().getQualifiedName());
+ return new TableScanNode(rel.getTraitSet(), rel.getTable().getQualifiedName());
}
@Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
- return ctx.getSchema().getTableForMember(tableName).toRel(ctx);
+ return new IgniteTableScan(ctx.getCluster(),
+ traitSet.toTraitSet(ctx.getCluster()),
+ ctx.getSchema().getTableForMember(tableName));
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
similarity index 94%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
index 58f7146..a065dbe 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
import org.apache.calcite.rel.RelNode;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index e523f50..c812f66 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -27,13 +27,13 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
-public class Fragment implements Source {
+public class Fragment implements RelSource {
private static final AtomicLong ID_GEN = new AtomicLong();
private final long exchangeId = ID_GEN.getAndIncrement();
@@ -47,7 +47,6 @@ public class Fragment implements Source {
}
public void init(PlannerContext ctx, RelMetadataQuery mq) {
-
FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
if (info.mapping() == null)
@@ -55,12 +54,12 @@ public class Fragment implements Source {
else
mapping = info.mapping().deduplicate();
- ImmutableList<Pair<IgniteReceiver, Source>> sources = info.sources();
+ ImmutableList<Pair<IgniteReceiver, RelSource>> sources = info.sources();
if (!F.isEmpty(sources)) {
- for (Pair<IgniteReceiver, Source> input : sources) {
+ for (Pair<IgniteReceiver, RelSource> input : sources) {
IgniteReceiver receiver = input.left;
- Source source = input.right;
+ RelSource source = input.right;
source.init(mapping, receiver.distribution(), ctx, mq);
}
@@ -71,10 +70,10 @@ public class Fragment implements Source {
return exchangeId;
}
- @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
+ @Override public void init(NodesMapping mapping, IgniteDistribution distribution, PlannerContext ctx, RelMetadataQuery mq) {
assert remote();
- ((IgniteSender) root).init(new TargetImpl(exchangeId, mapping, distribution));
+ ((IgniteSender) root).target(new RelTargetImpl(exchangeId, mapping, distribution));
init(ctx, mq);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index fd0c75f..4a8ac5d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -26,7 +26,6 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQ
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.util.Edge;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -62,11 +61,11 @@ public class QueryPlan {
RelOptCluster cluster = child.getCluster();
RelTraitSet traitSet = child.getTraitSet();
- IgniteSender sender = new IgniteSender(cluster, traitSet, child);
- Fragment fragment = new Fragment(sender);
+ Fragment fragment = new Fragment(new IgniteSender(cluster, traitSet, child));
+
fragments.add(fragment);
- parent.replaceInput(edge.childIdx(), new IgniteReceiver(cluster, traitSet, sender.getRowType(), fragment));
+ parent.replaceInput(edge.childIdx(), new IgniteReceiver(cluster, traitSet, child.getRowType(), fragment));
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
similarity index 88%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
index e0108b7..b4eb49e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
@@ -19,12 +19,12 @@ package org.apache.ignite.internal.processors.query.calcite.splitter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
*
*/
-public interface Source {
+public interface RelSource {
/**
* @return Exchange id, has to be unique in scope of query.
*/
@@ -41,7 +41,7 @@ public interface Source {
* @param ctx Context.
* @param mq Metadata query instance.
*/
- default void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
- // No-op.
+ default void init(NodesMapping mapping, IgniteDistribution distribution, PlannerContext ctx, RelMetadataQuery mq) {
+ // No-op
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
similarity index 90%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
index 44b271d..b0ca757 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
@@ -22,11 +22,11 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
/**
*
*/
-public class SourceImpl implements Source, Serializable {
+public class RelSourceImpl implements RelSource, Serializable {
private final long exchangeId;
private final NodesMapping mapping;
- public SourceImpl(long exchangeId, NodesMapping mapping) {
+ public RelSourceImpl(long exchangeId, NodesMapping mapping) {
this.exchangeId = exchangeId;
this.mapping = mapping;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
similarity index 91%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
index fecea20..b031b8e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
*
*/
-public interface Target {
+public interface RelTarget {
long exchangeId();
NodesMapping mapping();
- DistributionTrait distribution();
+ IgniteDistribution distribution();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
similarity index 81%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
index a69a8f1..a3c9b29 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
@@ -18,17 +18,17 @@ package org.apache.ignite.internal.processors.query.calcite.splitter;
import java.io.Serializable;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
*
*/
-public class TargetImpl implements Target, Serializable {
+public class RelTargetImpl implements RelTarget, Serializable {
private final long exchangeId;
private final NodesMapping mapping;
- private final DistributionTrait distribution;
+ private final IgniteDistribution distribution;
- public TargetImpl(long exchangeId, NodesMapping mapping, DistributionTrait distribution) {
+ public RelTargetImpl(long exchangeId, NodesMapping mapping, IgniteDistribution distribution) {
this.exchangeId = exchangeId;
this.mapping = mapping;
this.distribution = distribution;
@@ -42,7 +42,7 @@ public class TargetImpl implements Target, Serializable {
return mapping;
}
- @Override public DistributionTrait distribution() {
+ @Override public IgniteDistribution distribution() {
return distribution;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 53ae9df..f2ea67a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelShuttle;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
/**
*
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AffinityFactory.java
similarity index 61%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AffinityFactory.java
index 863f93c..f1dbbac 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AffinityFactory.java
@@ -16,7 +16,6 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
-import java.io.ObjectStreamException;
import java.util.List;
import java.util.UUID;
import java.util.function.ToIntFunction;
@@ -29,29 +28,19 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
*/
-public final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
- public static final DestinationFunctionFactory INSTANCE = new HashFunctionFactory();
+public final class AffinityFactory extends AbstractDestinationFunctionFactory {
+ private final int cacheId;
+ private final Object key;
- @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
- assert m != null && !F.isEmpty(m.assignments());
-
- int[] fields = k.toIntArray();
-
- ToIntFunction<Object> hashFun = r -> {
- Object[] row = (Object[]) r;
-
- if (row == null)
- return 0;
-
- int hash = 1;
-
- for (int i : fields)
- hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode());
+ public AffinityFactory(int cacheId, Object key) {
+ this.cacheId = cacheId;
+ this.key = key;
+ }
- return hash;
- };
+ @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) {
+ assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
- List<List<UUID>> assignments = m.assignments();
+ List<List<UUID>> assignments = mapping.assignments();
if (U.assertionsEnabled()) {
for (List<UUID> assignment : assignments) {
@@ -59,14 +48,13 @@ public final class HashFunctionFactory extends AbstractDestinationFunctionFactor
}
}
- return r -> assignments.get(hashFun.applyAsInt(r) % assignments.size());
- }
+ ToIntFunction<Object> rowToPart = ctx.kernalContext()
+ .cache().context().cacheContext(cacheId).affinity()::partition;
- @Override public Object key() {
- return "HashFunctionFactory";
+ return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
}
- private Object readResolve() throws ObjectStreamException {
- return INSTANCE;
+ @Override public Object key() {
+ return key;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 0fe775a..5972a11 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,43 +16,59 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import com.google.common.collect.Ordering;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamException;
import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
+import org.apache.calcite.plan.RelMultipleTrait;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mappings;
+
+import static org.apache.calcite.rel.RelDistribution.Type.ANY;
+import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.RANDOM_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.RANGE_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED;
/**
*
*/
-public final class DistributionTrait implements RelTrait, Serializable {
- private DistributionType type;
+public final class DistributionTrait implements IgniteDistribution, Serializable {
+ private static final Ordering<Iterable<Integer>> ORDERING =
+ Ordering.<Integer>natural().lexicographical();
+
+ private RelDistribution.Type type;
private ImmutableIntList keys;
private DestinationFunctionFactory functionFactory;
public DistributionTrait() {
}
- public DistributionTrait(DistributionType type, ImmutableIntList keys, DestinationFunctionFactory functionFactory) {
+ public DistributionTrait(RelDistribution.Type type, ImmutableIntList keys, DestinationFunctionFactory functionFactory) {
+ if (type == RANGE_DISTRIBUTED || type == ROUND_ROBIN_DISTRIBUTED)
+ throw new IllegalArgumentException("Distribution type " + type + " is unsupported.");
+
this.type = type;
this.keys = keys;
this.functionFactory = functionFactory;
}
- public DistributionType type() {
+ @Override public RelDistribution.Type getType() {
return type;
}
- public DestinationFunctionFactory destinationFunctionFactory() {
+ @Override public DestinationFunctionFactory destinationFunctionFactory() {
return functionFactory;
}
- public ImmutableIntList keys() {
+ @Override public ImmutableIntList getKeys() {
return keys;
}
@@ -65,7 +81,9 @@ public final class DistributionTrait implements RelTrait, Serializable {
if (o instanceof DistributionTrait) {
DistributionTrait that = (DistributionTrait) o;
- return type == that.type() && Objects.equals(keys, that.keys);
+ return type == that.type
+ && Objects.equals(keys, that.keys)
+ && Objects.equals(functionFactory.key(), that.functionFactory.key());
}
return false;
@@ -76,10 +94,10 @@ public final class DistributionTrait implements RelTrait, Serializable {
}
@Override public String toString() {
- return type + (type == DistributionType.HASH ? String.valueOf(keys) : "");
+ return type + (type == Type.HASH_DISTRIBUTED ? "[" + functionFactory.key() + "]" + keys : "");
}
- @Override public RelTraitDef getTraitDef() {
+ @Override public DistributionTraitDef getTraitDef() {
return DistributionTraitDef.INSTANCE;
}
@@ -92,15 +110,15 @@ public final class DistributionTrait implements RelTrait, Serializable {
DistributionTrait other = (DistributionTrait) trait;
- if (other.type() == DistributionType.ANY)
+ if (other.getType() == ANY)
return true;
- if (type() == other.type())
- return type() != DistributionType.HASH
+ if (getType() == other.getType())
+ return getType() != HASH_DISTRIBUTED
|| (Objects.equals(keys, other.keys)
&& Objects.equals(functionFactory, other.functionFactory));
- return other.type() == DistributionType.RANDOM && type() == DistributionType.HASH;
+ return other.getType() == RANDOM_DISTRIBUTED && getType() == HASH_DISTRIBUTED;
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -110,7 +128,7 @@ public final class DistributionTrait implements RelTrait, Serializable {
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- type = (DistributionType) in.readObject();
+ type = (Type) in.readObject();
keys = ImmutableIntList.of((int[])in.readObject());
functionFactory = (DestinationFunctionFactory) in.readObject();
}
@@ -118,4 +136,39 @@ public final class DistributionTrait implements RelTrait, Serializable {
private Object readResolve() throws ObjectStreamException {
return DistributionTraitDef.INSTANCE.canonize(this);
}
+
+ @Override public IgniteDistribution apply(Mappings.TargetMapping mapping) {
+ if (keys.isEmpty())
+ return this;
+
+ assert type == HASH_DISTRIBUTED;
+
+ List<Integer> newKeys = IgniteDistributions.projectDistributionKeys(mapping, keys);
+
+ return newKeys.size() == keys.size() ? IgniteDistributions.hash(newKeys, functionFactory) :
+ IgniteDistributions.random();
+ }
+
+ @Override public boolean isTop() {
+ return type == Type.ANY;
+ }
+
+ @Override public int compareTo(RelMultipleTrait o) {
+ // TODO is this method really needed??
+
+ final IgniteDistribution distribution = (IgniteDistribution) o;
+
+ if (type == distribution.getType()
+ && (type == Type.HASH_DISTRIBUTED
+ || type == Type.RANGE_DISTRIBUTED)) {
+ int cmp = ORDERING.compare(getKeys(), distribution.getKeys());
+
+ if (cmp == 0)
+ cmp = Integer.compare(functionFactory.key().hashCode(), distribution.destinationFunctionFactory().key().hashCode());
+
+ return cmp;
+ }
+
+ return type.compareTo(distribution.getType());
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index 22e8815..306b7aa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -16,42 +16,51 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
/**
*
*/
-public class DistributionTraitDef extends RelTraitDef<DistributionTrait> {
+public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
/** */
public static final DistributionTraitDef INSTANCE = new DistributionTraitDef();
- @Override public Class<DistributionTrait> getTraitClass() {
- return DistributionTrait.class;
+ @Override public Class<IgniteDistribution> getTraitClass() {
+ return IgniteDistribution.class;
}
@Override public String getSimpleName() {
return "distr";
}
- @Override public RelNode convert(RelOptPlanner planner, RelNode rel, DistributionTrait targetDist, boolean allowInfiniteCostConverters) {
- DistributionTrait srcDist = rel.getTraitSet().getTrait(INSTANCE);
+ @Override public RelNode convert(RelOptPlanner planner, RelNode rel, IgniteDistribution targetDist, boolean allowInfiniteCostConverters) {
+ if (rel.getConvention() == Convention.NONE)
+ return null;
+
+ RelDistribution srcDist = rel.getTraitSet().getTrait(INSTANCE);
- // Source and Target have the same trait.
- if (srcDist.equals(targetDist))
+ if (srcDist == targetDist) // has to be interned
return rel;
- if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
- return null;
+ switch(targetDist.getType()){
+ case HASH_DISTRIBUTED:
+ case BROADCAST_DISTRIBUTED:
+ case SINGLETON:
+ Exchange exchange = new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel, targetDist);
+ RelNode newRel = planner.register(exchange, rel);
+ RelTraitSet newTraits = rel.getTraitSet().replace(targetDist);
+
+ if (!newRel.getTraitSet().equals(newTraits))
+ newRel = planner.changeTraits(newRel, newTraits);
- switch(targetDist.type()){
- case HASH:
- case BROADCAST:
- case SINGLE:
- return new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel);
+ return newRel;
case ANY:
return rel;
default:
@@ -59,11 +68,11 @@ public class DistributionTraitDef extends RelTraitDef<DistributionTrait> {
}
}
- @Override public boolean canConvert(RelOptPlanner planner, DistributionTrait fromTrait, DistributionTrait toTrait) {
+ @Override public boolean canConvert(RelOptPlanner planner, IgniteDistribution fromTrait, IgniteDistribution toTrait) {
return true;
}
- @Override public DistributionTrait getDefault() {
+ @Override public IgniteDistribution getDefault() {
return IgniteDistributions.any();
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index 863f93c..82c7325 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -63,7 +63,7 @@ public final class HashFunctionFactory extends AbstractDestinationFunctionFactor
}
@Override public Object key() {
- return "HashFunctionFactory";
+ return "DefaultHashFunction";
}
private Object readResolve() throws ObjectStreamException {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
similarity index 66%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
index ad710aa..5f3b175 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
@@ -14,14 +14,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.metadata;
+package org.apache.ignite.internal.processors.query.calcite.trait;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.util.mapping.Mappings;
/**
*
*/
-public interface TableDistributionService {
- DistributionTrait distribution(int cacheId, RowType rowType);
+public interface IgniteDistribution extends RelDistribution {
+ DestinationFunctionFactory destinationFunctionFactory();
+ @Override IgniteDistribution apply(Mappings.TargetMapping mapping);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 6a9e76d..3e3aa55 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -20,59 +20,253 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
+import static org.apache.calcite.rel.RelDistribution.Type.ANY;
+import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.RANDOM_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
/**
*
*/
public class IgniteDistributions {
- private static final DistributionTrait BROADCAST = new DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(), AllTargetsFactory.INSTANCE);
- private static final DistributionTrait SINGLE = new DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(), SingleTargetFactory.INSTANCE);
- private static final DistributionTrait RANDOM = new DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(), RandomTargetFactory.INSTANCE);
- private static final DistributionTrait ANY = new DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), NoOpFactory.INSTANCE);
+ private static final int BEST_CNT = 3;
- public static DistributionTrait any() {
- return ANY;
+ private static final IgniteDistribution BROADCAST_DISTR = new DistributionTrait(BROADCAST_DISTRIBUTED, ImmutableIntList.of(), AllTargetsFactory.INSTANCE);
+ private static final IgniteDistribution SINGLETON_DISTR = new DistributionTrait(SINGLETON, ImmutableIntList.of(), SingleTargetFactory.INSTANCE);
+ private static final IgniteDistribution RANDOM_DISTR = new DistributionTrait(RANDOM_DISTRIBUTED, ImmutableIntList.of(), RandomTargetFactory.INSTANCE);
+ private static final IgniteDistribution ANY_DISTR = new DistributionTrait(ANY, ImmutableIntList.of(), NoOpFactory.INSTANCE);
+
+ public static IgniteDistribution any() {
+ return ANY_DISTR;
+ }
+
+ public static IgniteDistribution random() {
+ return RANDOM_DISTR;
+ }
+
+ public static IgniteDistribution single() {
+ return SINGLETON_DISTR;
+ }
+
+ public static IgniteDistribution broadcast() {
+ return BROADCAST_DISTR;
}
- public static DistributionTrait random() {
- return RANDOM;
+ public static IgniteDistribution hash(List<Integer> keys) {
+ return DistributionTraitDef.INSTANCE.canonize(
+ new DistributionTrait(HASH_DISTRIBUTED, ImmutableIntList.copyOf(keys), HashFunctionFactory.INSTANCE));
+ }
+
+ public static IgniteDistribution hash(List<Integer> keys, DestinationFunctionFactory factory) {
+ return DistributionTraitDef.INSTANCE.canonize(
+ new DistributionTrait(HASH_DISTRIBUTED, ImmutableIntList.copyOf(keys), factory));
+ }
+
+ public static List<BiSuggestion> suggestJoin(IgniteDistribution leftIn, IgniteDistribution rightIn,
+ JoinInfo joinInfo, JoinRelType joinType) {
+ return topN(suggestJoin0(leftIn, rightIn, joinInfo, joinType), BEST_CNT);
+ }
+
+ public static List<BiSuggestion> suggestJoin(List<IgniteDistribution> leftIn, List<IgniteDistribution> rightIn,
+ JoinInfo joinInfo, JoinRelType joinType) {
+ HashSet<BiSuggestion> suggestions = new HashSet<>();
+
+ int bestCnt = 0;
+
+ for (IgniteDistribution leftIn0 : leftIn) {
+ for (IgniteDistribution rightIn0 : rightIn) {
+ for (BiSuggestion suggest : suggestJoin0(leftIn0, rightIn0, joinInfo, joinType)) {
+ if (suggestions.add(suggest) && suggest.needExchange == 0 && (++bestCnt) == BEST_CNT)
+ topN(new ArrayList<>(suggestions), BEST_CNT);
+ }
+ }
+ }
+
+ return topN(new ArrayList<>(suggestions), BEST_CNT);
}
- public static DistributionTrait single() {
- return SINGLE;
+ private static ArrayList<BiSuggestion> suggestJoin0(IgniteDistribution leftIn, IgniteDistribution rightIn,
+ JoinInfo joinInfo, JoinRelType joinType) {
+ /*
+ * Distributions table:
+ *
+ * ===============INNER JOIN==============
+ * hash + hash = hash
+ * broadcast + hash = hash
+ * hash + broadcast = hash
+ * broadcast + broadcast = broadcast
+ * single + single = single
+ *
+ * ===============LEFT JOIN===============
+ * hash + hash = hash
+ * hash + broadcast = hash
+ * broadcast + broadcast = broadcast
+ * single + single = single
+ *
+ * ===============RIGHT JOIN==============
+ * hash + hash = hash
+ * broadcast + hash = hash
+ * broadcast + broadcast = broadcast
+ * single + single = single
+ *
+ * ===========FULL JOIN/CROSS JOIN========
+ * broadcast + broadcast = broadcast
+ * single + single = single
+ *
+ *
+ * others require redistribution
+ */
+
+ ArrayList<BiSuggestion> res = new ArrayList<>();
+
+ IgniteDistribution out, left, right;
+
+ if (joinType == LEFT || joinType == RIGHT || (joinType == INNER && !F.isEmpty(joinInfo.keys()))) {
+ HashSet<DestinationFunctionFactory> factories = U.newHashSet(3);
+
+ if (leftIn.getKeys().equals(joinInfo.leftKeys))
+ factories.add(leftIn.destinationFunctionFactory());
+
+ if (rightIn.getKeys().equals(joinInfo.rightKeys))
+ factories.add(rightIn.destinationFunctionFactory());
+
+ factories.add(HashFunctionFactory.INSTANCE);
+
+ for (DestinationFunctionFactory factory : factories) {
+ out = hash(joinInfo.leftKeys, factory);
+
+ left = hash(joinInfo.leftKeys, factory); right = hash(joinInfo.rightKeys, factory);
+ add(res, out, leftIn, rightIn, left, right);
+
+ if (joinType == INNER || joinType == LEFT) {
+ left = hash(joinInfo.leftKeys, factory); right = broadcast();
+ add(res, out, leftIn, rightIn, left, right);
+ }
+
+ if (joinType == INNER || joinType == RIGHT) {
+ left = broadcast(); right = hash(joinInfo.rightKeys, factory);
+ add(res, out, leftIn, rightIn, left, right);
+ }
+ }
+ }
+
+ out = left = right = broadcast();
+ add(res, out, leftIn, rightIn, left, right);
+
+ out = left = right = single();
+ add(res, out, leftIn, rightIn, left, right);
+
+ return res;
}
- public static DistributionTrait broadcast() {
- return BROADCAST;
+ private static int add(ArrayList<BiSuggestion> dst, IgniteDistribution out, IgniteDistribution left, IgniteDistribution right,
+ IgniteDistribution newLeft, IgniteDistribution newRight) {
+ int exch = 0;
+
+ if (!left.satisfies(newLeft))
+ exch++;
+
+ if (!right.satisfies(newRight))
+ exch++;
+
+ dst.add(new BiSuggestion(out, newLeft, newRight, exch));
+
+ return exch;
}
- public static DistributionTrait hash(List<Integer> keys) {
- return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), HashFunctionFactory.INSTANCE);
+ private static List<BiSuggestion> topN(ArrayList<BiSuggestion> src, int n) {
+ Collections.sort(src);
+
+ return src.size() <= n ? src : src.subList(0, n);
}
- public static DistributionTrait hash(List<Integer> keys, DestinationFunctionFactory factory) {
- return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), factory);
+ public static List<Integer> projectDistributionKeys(Mappings.TargetMapping mapping, ImmutableIntList keys) {
+ if (mapping.getTargetCount() < keys.size())
+ return Collections.emptyList();
+
+ List<Integer> resKeys = new ArrayList<>(mapping.getTargetCount());
+
+ parent:
+ for (int i = 0; i < keys.size(); i++) {
+ int key = keys.getInt(i);
+
+ for (int j = 0; j < mapping.getTargetCount(); j++) {
+ if (mapping.getSourceOpt(j) == key) {
+ resKeys.add(j);
+
+ continue parent;
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ return resKeys;
}
- public static List<DistributionTrait> deriveDistributions(RelNode rel, RelMetadataQuery mq) {
- if (!(rel instanceof RelSubset)) {
- DistributionTrait dist = IgniteMdDistribution.distribution(rel, mq);
+ public static class BiSuggestion implements Comparable<BiSuggestion> {
+ private final IgniteDistribution out;
+ private final IgniteDistribution left;
+ private final IgniteDistribution right;
+ private final int needExchange;
+
+ public BiSuggestion(IgniteDistribution out, IgniteDistribution left, IgniteDistribution right, int needExchange) {
+ this.out = out;
+ this.left = left;
+ this.right = right;
+ this.needExchange = needExchange;
+ }
+
+ public IgniteDistribution out() {
+ return out;
+ }
+
+ public IgniteDistribution left() {
+ return left;
+ }
+
+ public IgniteDistribution right() {
+ return right;
+ }
+
+ public int needExchange() {
+ return needExchange;
+ }
- return dist.type() == DistributionType.ANY ? Collections.emptyList() : Collections.singletonList(dist);
+ @Override public int compareTo(@NotNull IgniteDistributions.BiSuggestion o) {
+ return Integer.compare(needExchange, o.needExchange);
}
- HashSet<DistributionTrait> res = new HashSet<>();
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
- for (RelNode relNode : ((RelSubset) rel).getRels())
- res.addAll(deriveDistributions(relNode, mq));
+ BiSuggestion that = (BiSuggestion) o;
- return res.isEmpty() ? Collections.emptyList() : new ArrayList<>(res);
+ if (needExchange != that.needExchange) return false;
+ if (out != that.out) return false;
+ if (left != that.left) return false;
+ return right == that.right;
+ }
+
+ @Override public int hashCode() {
+ int result = out.hashCode();
+ result = 31 * result + left.hashCode();
+ result = 31 * result + right.hashCode();
+ result = 31 * result + needExchange;
+ return result;
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 3a461cf..49d6091 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -21,28 +21,20 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptNode;
import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.NotNull;
@@ -81,43 +73,6 @@ public final class Commons {
return RelOptRule.operand(first, RelOptRule.operand(second, RelOptRule.any()));
}
- public static <T extends RelNode> RelOp<T, Boolean> transformSubset(RelOptRuleCall call, RelNode input, BiFunction<T, RelNode, RelNode> transformFun) {
- return rel -> {
- if (!(input instanceof RelSubset))
- return Boolean.FALSE;
-
- RelSubset subset = (RelSubset) input;
-
- Set<RelTraitSet> traits = subset.getRelList().stream()
- .filter(r -> r instanceof IgniteRel)
- .map(RelOptNode::getTraitSet)
- .collect(Collectors.toSet());
-
- if (traits.isEmpty())
- return Boolean.FALSE;
-
- Set<RelNode> transformed = Collections.newSetFromMap(new IdentityHashMap<>());
-
- boolean transform = Boolean.FALSE;
-
- for (RelTraitSet traitSet: traits) {
- RelNode newRel = RelOptRule.convert(subset, traitSet.simplify());
-
- if (transformed.add(newRel)) {
- RelNode out = transformFun.apply(rel, newRel);
-
- if (out != null) {
- call.transformTo(out);
-
- transform = Boolean.TRUE;
- }
- }
- }
-
- return transform;
- };
- }
-
public static <T> List<T> intersect(List<T> left, List<T> right) {
if (F.isEmpty(left) || F.isEmpty(right))
return Collections.emptyList();
@@ -171,6 +126,10 @@ public final class Commons {
return set;
}
+ public static PlannerContext plannerContext(RelNode rel) {
+ return plannerContext(rel.getCluster().getPlanner().getContext());
+ }
+
public static PlannerContext plannerContext(Context ctx) {
return Objects.requireNonNull(ctx.unwrap(PlannerContext.class));
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index c7a7081..5104816 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.processors.query.calcite.util;
import java.lang.reflect.Method;
import org.apache.calcite.linq4j.tree.Types;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DerivedDistribution;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
/**
*
*/
public enum IgniteMethod {
- DISTRIBUTION_TRAIT(DistributionTraitMetadata.class, "getDistributionTrait"),
+ DERIVED_DISTRIBUTIONS(DerivedDistribution.class, "deriveDistributions"),
FRAGMENT_INFO(FragmentMetadata.class, "getFragmentInfo");
private final Method method;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
deleted file mode 100644
index bcbe604..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.util;
-
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-
-/**
- *
- */
-public interface RelImplementor<T> {
- T implement(IgniteExchange rel);
- T implement(IgniteFilter rel);
- T implement(IgniteJoin rel);
- T implement(IgniteProject rel);
- T implement(IgniteTableScan rel);
- T implement(IgniteReceiver rel);
- T implement(IgniteSender rel);
- T implement(IgniteRel other);
-}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index a5159d3..ee4c22e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -40,19 +40,20 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.calcite.exec.ConsumerNode;
-import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable;
+import org.apache.ignite.internal.processors.query.calcite.exec.ImplementorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.Node;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
import org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue;
import org.apache.ignite.internal.processors.query.calcite.prepare.DataContextImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
@@ -61,7 +62,6 @@ import org.apache.ignite.internal.processors.query.calcite.serialize.relation.Re
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
@@ -70,27 +70,29 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
-import static org.apache.ignite.internal.processors.query.calcite.exec.Interpretable.INTERPRETABLE;
-
/**
*
*/
-@WithSystemProperty(key = "calcite.debug", value = "true")
+//@WithSystemProperty(key = "calcite.debug", value = "true")
public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
- private static GridTestKernalContext kernalContext;
- private static CalciteQueryProcessor proc;
- private static SchemaPlus schema;
- private static List<UUID> nodes;
+ private GridTestKernalContext kernalContext;
+ private CalciteQueryProcessor proc;
+ private SchemaPlus schema;
+ private List<UUID> nodes;
+
+ private TestIgniteTable city;
+ private TestIgniteTable country;
+ private TestIgniteTable project;
+ private TestIgniteTable developer;
- @BeforeClass
- public static void setupClass() {
+ @Before
+ public void setup() {
kernalContext = new GridTestKernalContext(log);
proc = new CalciteQueryProcessor();
proc.setLogger(log);
@@ -98,61 +100,50 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
- publicSchema.addTable(new IgniteTable("Developer", "Developer",
+ developer = new TestIgniteTable("Developer", "Developer",
RowType.builder()
.keyField("id", Integer.class, true)
.field("name", String.class)
.field("projectId", Integer.class)
.field("cityId", Integer.class)
- .build()){
- @Override public Enumerable<Object[]> scan(DataContext root) {
- return Linq4j.asEnumerable(Arrays.asList(
- new Object[]{0, null, 0, "Igor", 0, 1},
- new Object[]{1, null, 1, "Roman", 0, 0}
- ));
- }
- });
+ .build(), Arrays.asList(
+ new Object[]{0, null, 0, "Igor", 0, 1},
+ new Object[]{1, null, 1, "Roman", 0, 0}
+ ));
- publicSchema.addTable(new IgniteTable("Project", "Project",
+ project = new TestIgniteTable("Project", "Project",
RowType.builder()
.keyField("id", Integer.class, true)
.field("name", String.class)
.field("ver", Integer.class)
- .build()){
- @Override public Enumerable<Object[]> scan(DataContext root) {
- return Linq4j.asEnumerable(Arrays.asList(
- new Object[]{0, null, 0, "Calcite", 1},
- new Object[]{1, null, 1, "Ignite", 1}
- ));
- }
- });
+ .build(), Arrays.asList(
+ new Object[]{0, null, 0, "Calcite", 1},
+ new Object[]{1, null, 1, "Ignite", 1}
+ ));
- publicSchema.addTable(new IgniteTable("Country", "Country",
+ country = new TestIgniteTable("Country", "Country",
RowType.builder()
.keyField("id", Integer.class, true)
.field("name", String.class)
.field("countryCode", Integer.class)
- .build()){
- @Override public Enumerable<Object[]> scan(DataContext root) {
- return Linq4j.asEnumerable(Arrays.<Object[]>asList(
- new Object[]{0, null, 0, "Russia", 7}
- ));
- }
- });
+ .build(), Arrays.<Object[]>asList(
+ new Object[]{0, null, 0, "Russia", 7}
+ ));
- publicSchema.addTable(new IgniteTable("City", "City",
+ city = new TestIgniteTable("City", "City",
RowType.builder()
.keyField("id", Integer.class, true)
.field("name", String.class)
.field("countryId", Integer.class)
- .build()){
- @Override public Enumerable<Object[]> scan(DataContext root) {
- return Linq4j.asEnumerable(Arrays.asList(
- new Object[]{0, null, 0, "Moscow", 0},
- new Object[]{1, null, 1, "Saint Petersburg", 0}
- ));
- }
- });
+ .build(), Arrays.asList(
+ new Object[]{0, null, 0, "Moscow", 0},
+ new Object[]{1, null, 1, "Saint Petersburg", 0}
+ ));
+
+ publicSchema.addTable(developer);
+ publicSchema.addTable(project);
+ publicSchema.addTable(country);
+ publicSchema.addTable(city);
schema = Frameworks
.createRootSchema(false)
@@ -366,11 +357,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
// Transformation chain
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -426,11 +417,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
assertNotNull(rel);
@@ -442,7 +433,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
plan.init(ctx);
- RelGraph graph = new RelToGraphConverter().convert((IgniteRel) plan.fragments().get(1).root());
+ RelGraph graph = new RelToGraphConverter().go((IgniteRel) plan.fragments().get(1).root());
convertedBytes = new JdkMarshaller().marshal(graph);
@@ -464,6 +455,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
@Test
public void testSplitterCollocatedPartitionedPartitioned() throws Exception {
+ Object key = new Object();
+
+ developer.identityKey(key);
+ project.identityKey(key);
+
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +
"SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -504,11 +500,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -565,36 +561,17 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
// Transformation chain
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
- RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
- .replace(IgniteDistributions.single())
- .simplify();
-
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
-
- assertNotNull(relRoot);
-
- QueryPlan plan = new Splitter().go((IgniteRel) rel);
-
- assertNotNull(plan);
-
- plan.init(ctx);
+ RelTraitSet desired = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE);
- assertNotNull(plan);
-
- assertTrue(plan.fragments().size() == 2);
-
- desired = rel.getCluster().traitSetOf(INTERPRETABLE);
-
- RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, plan.fragments().get(1).root(), desired);
+ RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
assertNotNull(phys);
Map<String, Object> params = ctx.query().params(F.asMap(ContextValue.QUERY_ID.valueName(), new GridCacheVersion()));
- Interpretable.Implementor<Object[]> implementor = new Interpretable.Implementor<>(new DataContextImpl(params, ctx));
+ Implementor<Node<Object[]>> implementor = new ImplementorImpl(new DataContextImpl(params, ctx));
- Node<Object[]> exec = implementor.go(phys.getInput(0));
+ Node<Object[]> exec = implementor.go((IgniteRel) phys);
assertNotNull(exec);
@@ -625,12 +602,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- TableDistributionService ds = new TableDistributionService(){
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- return IgniteDistributions.broadcast();
- }
- };
-
MappingService ms = new MappingService() {
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -650,7 +621,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
assertNotNull(ctx);
RelTraitDef[] traitDefs = {
@@ -682,11 +653,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -706,6 +677,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
@Test
public void testSplitterCollocatedReplicatedAndPartitioned() throws Exception {
+ developer.identityKey(new Object());
+
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +
"SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -713,15 +686,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- TableDistributionService ds = new TableDistributionService(){
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- if (cacheId == CU.cacheId("Project"))
- return IgniteDistributions.broadcast();
-
- return IgniteDistributions.hash(rowType.distributionKeys());
- }
- };
-
MappingService ms = new MappingService() {
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -747,7 +711,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
assertNotNull(ctx);
@@ -780,11 +744,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -804,6 +768,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
@Test
public void testSplitterPartiallyCollocated() throws Exception {
+ developer.identityKey(new Object());
+
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +
"SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -811,15 +777,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- TableDistributionService ds = new TableDistributionService(){
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- if (cacheId == CU.cacheId("Project"))
- return IgniteDistributions.broadcast();
-
- return IgniteDistributions.hash(rowType.distributionKeys());
- }
- };
-
MappingService ms = new MappingService() {
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -845,7 +802,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
assertNotNull(ctx);
@@ -878,11 +835,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -909,12 +866,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.projectId = p.ver0 " +
"WHERE (d.projectId + 1) > ?";
- TableDistributionService ds = new TableDistributionService(){
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- return IgniteDistributions.broadcast();
- }
- };
-
MappingService ms = new MappingService() {
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -935,7 +886,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
assertNotNull(ctx);
@@ -968,11 +919,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -992,6 +943,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
@Test
public void testSplitterPartiallyReplicated1() throws Exception {
+ developer.identityKey(new Object());
+
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +
"SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -999,16 +952,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
-
- TableDistributionService ds = new TableDistributionService(){
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- if (cacheId == CU.cacheId("Project"))
- return IgniteDistributions.broadcast();
-
- return IgniteDistributions.hash(rowType.distributionKeys());
- }
- };
-
MappingService ms = new MappingService() {
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -1034,7 +977,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
assertNotNull(ctx);
@@ -1067,11 +1010,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -1091,6 +1034,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
@Test
public void testSplitterPartiallyReplicated2() throws Exception {
+ developer.identityKey(new Object());
+
String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
"FROM PUBLIC.Developer d JOIN (" +
"SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -1098,16 +1043,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
"ON d.id = p.id0 " +
"WHERE (d.projectId + 1) > ?";
-
- TableDistributionService ds = new TableDistributionService() {
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- if (cacheId == CU.cacheId("Project"))
- return IgniteDistributions.broadcast();
-
- return IgniteDistributions.hash(rowType.distributionKeys());
- }
- };
-
MappingService ms = new MappingService() {
@Override public NodesMapping random(AffinityTopologyVersion topVer) {
return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -1133,7 +1068,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+ PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
assertNotNull(ctx);
@@ -1166,11 +1101,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteRel.IGNITE_CONVENTION)
+ .replace(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single())
.simplify();
- rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+ rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
}
@@ -1230,16 +1165,10 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
};
- TableDistributionService ds = new TableDistributionService() {
- @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
- return IgniteDistributions.hash(rowType.distributionKeys());
- }
- };
-
- return context(c, q, ms, ds);
+ return context(c, q, ms);
}
- private PlannerContext context(Context parent, Query query, MappingService ms, TableDistributionService ds) {
+ private PlannerContext context(Context parent, Query query, MappingService ms) {
return PlannerContext.builder()
.parentContext(parent)
.logger(log)
@@ -1248,8 +1177,28 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
.query(query)
.schema(schema)
.topologyVersion(AffinityTopologyVersion.NONE)
- .distributionService(ds)
.mappingService(ms)
.build();
}
+
+ public static class TestIgniteTable extends IgniteTable {
+ private final List<Object[]> data;
+ private Object identityKey;
+ public TestIgniteTable(String tableName, String cacheName, RowType rowType, List<Object[]> data) {
+ super(tableName, cacheName, rowType, null);
+ this.data = data;
+ }
+
+ public void identityKey(Object identityKey) {
+ this.identityKey = identityKey;
+ }
+
+ @Override public Object identityKey() {
+ return identityKey;
+ }
+
+ @Override public Enumerable<Object[]> scan(DataContext root) {
+ return Linq4j.asEnumerable(data);
+ }
+ }
}
\ No newline at end of file