You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/03/16 22:00:22 UTC
[ignite] branch ignite-12248 updated: IGNITE-12776: Calcite
integration. An exception occurred when a replicated cache is used in a
query. #7519
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 47a2da9 IGNITE-12776: Calcite integration. An exception occurred when a replicated cache is used in a query. #7519
47a2da9 is described below
commit 47a2da98851050ae7c64797cebb30bb9529d7180
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Mar 17 01:00:07 2020 +0300
IGNITE-12776: Calcite integration. An exception occurred when a replicated cache is used in a query. #7519
---
.../query/calcite/exec/LogicalRelImplementor.java | 9 +-
.../query/calcite/metadata/FragmentInfo.java | 28 ++-
.../calcite/metadata/IgniteMdFragmentInfo.java | 34 ++--
.../query/calcite/metadata/MappingServiceImpl.java | 17 +-
.../query/calcite/metadata/NodesMapping.java | 128 ++++++-------
.../metadata/OptimisticPlanningException.java | 26 +--
.../calcite/metadata/PartitionServiceImpl.java | 3 +-
.../calcite/prepare/AbstractMultiStepPlan.java | 53 ++----
.../processors/query/calcite/prepare/Edge.java | 66 -------
.../processors/query/calcite/prepare/Fragment.java | 51 +++---
.../query/calcite/prepare/FragmentSplitter.java | 198 +++++++++++++++++++++
.../query/calcite/prepare/IgnitePlanner.java | 4 +-
.../query/calcite/prepare/RelSource.java | 52 ------
.../query/calcite/prepare/RelTarget.java | 9 +-
.../{RelSourceImpl.java => RelTargetAware.java} | 31 +---
.../query/calcite/prepare/RelTargetImpl.java | 16 +-
.../processors/query/calcite/prepare/Splitter.java | 8 +-
.../query/calcite/rel/IgniteReceiver.java | 12 +-
.../processors/query/calcite/rel/IgniteSender.java | 32 +++-
.../query/calcite/rule/JoinConverter.java | 2 +-
.../calcite/serialize/RelToPhysicalConverter.java | 10 +-
.../query/calcite/trait/DistributionFunction.java | 32 +++-
.../query/calcite/trait/DistributionTraitDef.java | 27 +--
.../query/calcite/trait/IgniteDistributions.java | 14 +-
.../query/calcite/trait/Partitioned.java | 2 +-
.../processors/query/calcite/util/Commons.java | 49 +++--
.../processors/query/calcite/util/TableScan.java | 2 +-
.../query/calcite/CalciteQueryProcessorTest.java | 39 ++++
28 files changed, 526 insertions(+), 428 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 49b92ef..baea3f2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
-import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -97,12 +97,11 @@ public class LogicalRelImplementor implements IgniteRelVisitor<Node<Object[]>> {
/** {@inheritDoc} */
@Override public Node<Object[]> visit(IgniteSender rel) {
RelTarget target = rel.target();
- long targetFragmentId = target.fragmentId();
- IgniteDistribution distribution = target.distribution();
+ IgniteDistribution distribution = rel.targetDistribution();
Destination destination = distribution.function().destination(partitionService, target.mapping(), distribution.getKeys());
// Outbox fragment ID is used as exchange ID as well.
- Outbox<Object[]> outbox = new Outbox<>(ctx, exchangeService, mailboxRegistry, ctx.fragmentId(), targetFragmentId, destination);
+ Outbox<Object[]> outbox = new Outbox<>(ctx, exchangeService, mailboxRegistry, ctx.fragmentId(), target.fragmentId(), destination);
outbox.register(visit(rel.getInput()));
mailboxRegistry.register(outbox);
@@ -167,7 +166,7 @@ public class LogicalRelImplementor implements IgniteRelVisitor<Node<Object[]>> {
/** {@inheritDoc} */
@Override public Node<Object[]> visit(IgniteReceiver rel) {
- RelSource source = rel.source();
+ Fragment source = rel.source();
// Corresponding outbox fragment ID is used as exchange ID as well.
Inbox<Object[]> inbox = (Inbox<Object[]>) mailboxRegistry.register(new Inbox<>(ctx, exchangeService, mailboxRegistry, source.fragmentId(), source.fragmentId()));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index e787d66..c9bc75a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -18,10 +18,8 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
import com.google.common.collect.ImmutableList;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTargetAware;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -33,7 +31,7 @@ public class FragmentInfo {
private final NodesMapping mapping;
/** */
- private final ImmutableList<Pair<IgniteReceiver, RelSource>> sources;
+ private final ImmutableList<RelTargetAware> targetAwareList;
/**
* Constructs Values leaf fragment info.
@@ -45,10 +43,10 @@ public class FragmentInfo {
/**
* Constructs Receiver leaf fragment info.
*
- * @param source Pair of a Receiver relational node and its data source information.
+ * @param targetAware A node that needs target information.
*/
- public FragmentInfo(Pair<IgniteReceiver, RelSource> source) {
- this(ImmutableList.of(source), null);
+ public FragmentInfo(RelTargetAware targetAware) {
+ this(ImmutableList.of(targetAware), null);
}
/**
@@ -63,11 +61,11 @@ public class FragmentInfo {
/**
* Used on merge of two relational node tree edges.
*
- * @param sources Pairs of underlying Receiver relational nodes and theirs data source information.
+ * @param targetAwareList nodes that need target information.
* @param mapping Nodes mapping, describing where interested data placed.
*/
- public FragmentInfo(ImmutableList<Pair<IgniteReceiver, RelSource>> sources, NodesMapping mapping) {
- this.sources = sources;
+ public FragmentInfo(ImmutableList<RelTargetAware> targetAwareList, NodesMapping mapping) {
+ this.targetAwareList = targetAwareList;
this.mapping = mapping;
}
@@ -86,10 +84,10 @@ public class FragmentInfo {
}
/**
- * @return Pairs of underlying Receiver relational nodes and theirs data source information.
+ * @return Nodes that need target information..
*/
- public ImmutableList<Pair<IgniteReceiver, RelSource>> sources() {
- return sources;
+ public ImmutableList<RelTargetAware> targetAwareList() {
+ return targetAwareList;
}
/**
@@ -100,7 +98,7 @@ public class FragmentInfo {
*/
public FragmentInfo merge(FragmentInfo other) throws LocationMappingException {
return new FragmentInfo(
- merge(sources(), other.sources()),
+ merge(targetAwareList(), other.targetAwareList()),
merge(mapping(), other.mapping()));
}
@@ -115,7 +113,7 @@ public class FragmentInfo {
NodesMapping newMapping = mapping.prune(filter);
if (newMapping != mapping)
- return new FragmentInfo(sources, newMapping);
+ return new FragmentInfo(targetAwareList, newMapping);
}
return this;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 26d5212..23bce2c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.Join;
@@ -29,9 +28,7 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
-import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -110,19 +107,22 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
return left.merge(right);
}
catch (LocationMappingException e) {
+ String msg = "Failed to calculate physical distribution";
+
// a replicated cache is cheaper to redistribute
if (!left.mapping().hasPartitionedCaches())
- throw planningException(rel, e, true);
+ throw new OptimisticPlanningException(msg, rel.getLeft(), e);
else if (!right.mapping().hasPartitionedCaches())
- throw planningException(rel, e, false);
-
- // both sub-trees have partitioned sources, less cost is better
- RelOptCluster cluster = rel.getCluster();
+ throw new OptimisticPlanningException(msg, rel.getRight(), e);
+ else {
+ // both sub-trees have partitioned sources, less cost is better
+ RelOptCluster cluster = rel.getCluster();
- RelOptCost leftCost = rel.getLeft().computeSelfCost(cluster.getPlanner(), mq);
- RelOptCost rightCost = rel.getRight().computeSelfCost(cluster.getPlanner(), mq);
+ RelOptCost leftCost = rel.getLeft().computeSelfCost(cluster.getPlanner(), mq);
+ RelOptCost rightCost = rel.getRight().computeSelfCost(cluster.getPlanner(), mq);
- throw planningException(rel, e, leftCost.isLe(rightCost));
+ throw new OptimisticPlanningException(msg, leftCost.isLe(rightCost) ? rel.getLeft() : rel.getRight(), e);
+ }
}
}
@@ -130,7 +130,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
* See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
*/
public FragmentInfo fragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
- return new FragmentInfo(Pair.of(rel, rel.source()));
+ return new FragmentInfo(rel.source());
}
/**
@@ -156,14 +156,4 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
public static FragmentInfo _fragmentInfo(RelNode rel, RelMetadataQuery mq) {
return RelMetadataQueryEx.wrap(mq).getFragmentInfo(rel);
}
-
- /** */
- private static OptimisticPlanningException planningException(BiRel rel, Exception cause, boolean splitLeft) {
- String msg = "Failed to calculate physical distribution";
-
- if (splitLeft)
- return new OptimisticPlanningException(msg, new Edge(rel, rel.getLeft(), 0), cause);
-
- return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java
index f790c63..25c38dd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -30,12 +32,15 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED;
-
/**
*
*/
public class MappingServiceImpl extends AbstractService implements MappingService {
+ /**
+ * Max nodes count, used on to-hash or to-random redistribution.
+ */
+ private static final int MAX_BUCKETS_COUNT = IgniteSystemProperties.getInteger("IGNITE_CALCITE_MAX_BUCKETS_COUNT", 1024);
+
/** */
private GridDiscoveryManager discoveryManager;
@@ -62,17 +67,19 @@ public class MappingServiceImpl extends AbstractService implements MappingServic
@Override public NodesMapping mapBalanced(@NotNull AffinityTopologyVersion topVer, int desiredCnt, @Nullable Predicate<ClusterNode> nodeFilter) {
assert desiredCnt >= 0;
- List<ClusterNode> nodes = discoveryManager.discoCache(topVer).serverNodes();
+ desiredCnt = desiredCnt == 0 ? MAX_BUCKETS_COUNT : Math.min(desiredCnt, MAX_BUCKETS_COUNT);
+
+ List<ClusterNode> nodes = new ArrayList<>(discoveryManager.discoCache(topVer).serverNodes());
if (nodeFilter != null)
nodes = nodes.stream().filter(nodeFilter).collect(Collectors.toList());
- if (desiredCnt != 0 && desiredCnt < nodes.size()) {
+ if (desiredCnt < nodes.size()) {
Collections.shuffle(nodes);
nodes = nodes.subList(0, desiredCnt);
}
- return new NodesMapping(Commons.transform(nodes, ClusterNode::id), null, DEDUPLICATED);
+ return new NodesMapping(Commons.transform(nodes, ClusterNode::id), null, (byte) 0);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index cd28a28..3bb89b0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -19,12 +19,10 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -64,6 +62,7 @@ public class NodesMapping implements Serializable {
/** */
private final byte flags;
+ /** */
public NodesMapping(List<UUID> nodes, List<List<UUID>> assignments, byte flags) {
this.nodes = nodes;
this.assignments = assignments;
@@ -105,22 +104,19 @@ public class NodesMapping implements Serializable {
public NodesMapping mergeWith(NodesMapping other) throws LocationMappingException {
byte flags = (byte) (this.flags | other.flags);
- if ((flags & (PARTIALLY_REPLICATED | CLIENT)) == 0)
- return new NodesMapping(U.firstNotNull(nodes, other.nodes), mergeAssignments(other, null), flags);
-
- List<UUID> nodes;
+ List<UUID> nodes = intersectReplicated(this.nodes, other.nodes);
- if (this.nodes == null)
- nodes = other.nodes;
- else if (other.nodes == null)
- nodes = this.nodes;
- else
- nodes = Commons.intersect(this.nodes, other.nodes);
+ // if there is no moving partitions both assignments are identical.
+ List<List<UUID>> assignments = (flags & HAS_MOVING_PARTITIONS) == 0
+ ? U.firstNotNull(this.assignments, other.assignments)
+ : intersectPartitioned(this.assignments, other.assignments);
- if (nodes != null && nodes.isEmpty())
- throw new LocationMappingException("Failed to map fragment to location.");
+ // In case all involved replicated caches are available on
+ // all nodes it's no need to check assignments against them.
+ if ((flags & (PARTIALLY_REPLICATED | CLIENT)) == 0)
+ return new NodesMapping(nodes, assignments, flags);
- return new NodesMapping(nodes, mergeAssignments(other, nodes), flags);
+ return new NodesMapping(nodes, intersectReplicatedPartitioned(nodes, assignments), flags);
}
/**
@@ -132,31 +128,32 @@ public class NodesMapping implements Serializable {
* @return Nodes mapping, containing nodes, that actually will be in charge of query execution.
*/
public NodesMapping deduplicate() {
- if (!excessive())
+ if ((flags & DEDUPLICATED) == DEDUPLICATED)
return this;
if (assignments == null) {
- UUID node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+ List<List<UUID>> assignments0 = new ArrayList<>(nodes.size());
+
+ for (UUID node : nodes)
+ assignments0.add(F.asList(node));
- return new NodesMapping(Collections.singletonList(node), null, (byte)(flags | DEDUPLICATED));
+ return new NodesMapping(nodes, assignments0, (byte)(flags | DEDUPLICATED));
}
- HashSet<UUID> nodes0 = new HashSet<>();
+ HashSet<UUID> nodesSet = new HashSet<>();
+ ArrayList<UUID> nodes0 = new ArrayList<>();
List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
for (List<UUID> partNodes : assignments) {
UUID node = F.first(partNodes);
- if (node == null)
- assignments0.add(Collections.emptyList());
- else {
- assignments0.add(Collections.singletonList(node));
+ assignments0.add(F.asList(node));
+ if (node != null && nodesSet.add(node))
nodes0.add(node);
- }
}
- return new NodesMapping(new ArrayList<>(nodes0), assignments0, (byte)(flags | DEDUPLICATED));
+ return new NodesMapping(nodes0, assignments0, (byte)(flags | DEDUPLICATED));
}
/**
@@ -166,8 +163,7 @@ public class NodesMapping implements Serializable {
* @return List of partitions to scan on the given node.
*/
public int[] partitions(UUID node) {
- if (assignments == null)
- return null;
+ assert (flags & DEDUPLICATED) == DEDUPLICATED;
GridIntList parts = new GridIntList(assignments.size());
@@ -181,13 +177,6 @@ public class NodesMapping implements Serializable {
}
/**
- * @return {@code True} if mapping is excessive.
- */
- public boolean excessive() {
- return (flags & DEDUPLICATED) == 0;
- }
-
- /**
* @return {@code True} if some of involved partitioned tables are being rebalanced.
*/
public boolean hasMovingPartitions() {
@@ -218,59 +207,56 @@ public class NodesMapping implements Serializable {
}
/** */
- private List<List<UUID>> mergeAssignments(NodesMapping other, List<UUID> nodes) throws LocationMappingException {
- byte flags = (byte) (this.flags | other.flags);
- List<List<UUID>> left = assignments;
- List<List<UUID>> right = other.assignments;
+ private List<UUID> intersectReplicated(List<UUID> left, List<UUID> right) throws LocationMappingException {
+ if (left == null || right == null)
+ return U.firstNotNull(left, right);
- if (left == null && right == null)
- return null; // nothing to intersect;
+ List<UUID> nodes = Commons.intersect(right, left);
- if (left == null || right == null || (flags & HAS_MOVING_PARTITIONS) == 0) {
- List<List<UUID>> assignments = U.firstNotNull(left, right);
+ if (F.isEmpty(nodes)) // replicated caches aren't co-located on all nodes.
+ throw new LocationMappingException("Failed to map fragment to location. Replicated query parts are not co-located on all nodes");
- if (nodes == null)
- return assignments;
+ return nodes;
+ }
- List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
- HashSet<UUID> nodesSet = new HashSet<>(nodes);
+ /** */
+ private List<List<UUID>> intersectPartitioned(List<List<UUID>> left, List<List<UUID>> right) throws LocationMappingException {
+ if (left == null || right == null)
+ return U.firstNotNull(left, right);
- for (List<UUID> partNodes : assignments) {
- List<UUID> partNodes0 = new ArrayList<>(partNodes.size());
+ assert left.size() == right.size();
- for (UUID partNode : partNodes) {
- if (nodesSet.contains(partNode))
- partNodes0.add(partNode);
- }
+ List<List<UUID>> res = new ArrayList<>(left.size());
- if (partNodes0.isEmpty()) // TODO check with partition filters
- throw new LocationMappingException("Failed to map fragment to location.");
+ for (int i = 0; i < left.size(); i++) {
+ List<UUID> partNodes = Commons.intersect(left.get(i), right.get(i));
- assignments0.add(partNodes0);
- }
+ if (partNodes.isEmpty()) // TODO check with partition filters
+ throw new LocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
- return assignments0;
+ res.add(partNodes);
}
- List<List<UUID>> assignments = new ArrayList<>(left.size());
- HashSet<UUID> nodesSet = nodes != null ? new HashSet<>(nodes) : null;
+ return res;
+ }
+
+ /** */
+ private List<List<UUID>> intersectReplicatedPartitioned(List<UUID> nodes, List<List<UUID>> assignments) throws LocationMappingException {
+ if (nodes == null || assignments == null)
+ return assignments;
- for (int i = 0; i < left.size(); i++) {
- List<UUID> leftNodes = left.get(i);
- List<UUID> partNodes = new ArrayList<>(leftNodes.size());
- HashSet<UUID> rightNodesSet = new HashSet<>(right.get(i));
+ HashSet<UUID> nodesSet = new HashSet<>(nodes);
+ List<List<UUID>> res = new ArrayList<>(assignments.size());
- for (UUID partNode : leftNodes) {
- if (rightNodesSet.contains(partNode) && (nodesSet == null || nodesSet.contains(partNode)))
- partNodes.add(partNode);
- }
+ for (int i = 0; i < assignments.size(); i++) {
+ List<UUID> partNodes = Commons.intersect(nodesSet, assignments.get(i));
- if (partNodes.isEmpty())
- throw new LocationMappingException("Failed to map fragment to location.");
+ if (partNodes.isEmpty()) // TODO check with partition filters
+ throw new LocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
- assignments.add(partNodes);
+ res.add(partNodes);
}
- return assignments;
+ return res;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index 8857b45..25dcc1f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -17,40 +17,30 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
+import org.apache.calcite.rel.RelNode;
/**
*
*/
public class OptimisticPlanningException extends RuntimeException{
/** */
- private final Edge edge;
+ private final RelNode node;
/**
*
* @param message Message.
- * @param edge Edge of query plan, where the exception was thrown.
+ * @param node Node of a query plan, where the exception was thrown.
* @param cause Cause.
*/
- public OptimisticPlanningException(String message, Edge edge, Throwable cause) {
+ public OptimisticPlanningException(String message, RelNode node, Throwable cause) {
super(message, cause);
- this.edge = edge;
+ this.node = node;
}
/**
- *
- * @param message Message.
- * @param edge Edge of query plan, where the exception was thrown.
- */
- public OptimisticPlanningException(String message, Edge edge) {
- super(message);
- this.edge = edge;
- }
-
- /**
- * @return Edge of query plan, where the exception was thrown.
+ * @return Node of a query plan, where the exception was thrown.
*/
- public Edge edge() {
- return edge;
+ public RelNode node() {
+ return node;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java
index 4c01d8d..ed85c4e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
@@ -53,7 +54,7 @@ public class PartitionServiceImpl extends AbstractService implements PartitionSe
/** {@inheritDoc} */
@Override public ToIntFunction<Object> partitionFunction(int cacheId) {
if (cacheId == CU.UNDEFINED_CACHE_ID)
- return k -> k == null ? 0 : k.hashCode();
+ return k -> k == null ? 0 : U.safeAbs(k.hashCode());
AffinityFunction affinity = cacheSharedContext.cacheContext(cacheId).group().affinityFunction();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 27f1963..99f316b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -18,16 +18,11 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
/**
@@ -57,48 +52,32 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
/** {@inheritDoc} */
@Override public void init(MappingService mappingService, PlanningContext ctx) {
- int i = 0;
-
RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
- while (true) {
+ for (int i = 0, j = 0; i < fragments.size();) {
+ Fragment fragment = fragments.get(i);
+
try {
- F.first(fragments).init(mappingService, ctx, mq);
+ fragment.init(mappingService, ctx, mq);
- break;
+ i++;
}
catch (OptimisticPlanningException e) {
- if (++i > 3)
+ if (++j > 3)
throw new IgniteSQLException("Failed to map query.", e);
- Edge edge = e.edge();
-
- RelNode parent = edge.parent();
- RelNode child = edge.child();
-
- RelOptCluster cluster = child.getCluster();
- RelTraitSet traitSet = child.getTraitSet();
-
- Fragment fragment = new Fragment(new IgniteSender(cluster, traitSet, child));
-
- fragments.add(fragment);
-
- if (parent != null)
- parent.replaceInput(edge.childIndex(), new IgniteReceiver(cluster, traitSet, child.getRowType(), fragment));
- else {
- // need to fix a distribution of a root of a fragment
- int idx = 0;
+ replace(fragment, new FragmentSplitter().go(fragment, e.node(), mq));
- for (; idx < fragments.size(); idx++) {
- if (fragments.get(idx).root() == child)
- break;
- }
-
- assert idx < fragments.size();
-
- fragments.set(idx, new Fragment(new IgniteReceiver(cluster, traitSet, child.getRowType(), fragment)));
- }
+ i = 0; // restart init routine.
}
}
}
+
+ /** */
+ private void replace(Fragment fragment, List<Fragment> replacement) {
+ assert !F.isEmpty(replacement);
+
+ fragments.set(fragments.indexOf(fragment), F.first(replacement));
+ fragments.addAll(replacement.subList(1, replacement.size()));
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
deleted file mode 100644
index fe0f87b..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-
-import org.apache.calcite.rel.RelNode;
-
-/**
- * Describes RelNode tree edge.
- */
-public class Edge {
- /** */
- private final RelNode parent;
-
- /** */
- private final RelNode child;
-
- /** */
- private final int childIdx;
-
- /**
- * @param parent Parent node.
- * @param child Child node.
- * @param childIdx Child node index in parent.
- */
- public Edge(RelNode parent, RelNode child, int childIdx) {
- this.parent = parent;
- this.child = child;
- this.childIdx = childIdx;
- }
-
- /**
- * @return Parent node.
- */
- public RelNode parent() {
- return parent;
- }
-
- /**
- * @return Child node.
- */
- public RelNode child() {
- return child;
- }
-
- /**
- * @return Child node index in parent.
- */
- public int childIndex() {
- return childIdx;
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index b9ef7c3..43251b1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -17,20 +17,16 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
import org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
@@ -41,7 +37,7 @@ import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
/**
* Fragment of distributed query
*/
-public class Fragment implements RelSource {
+public class Fragment implements RelTargetAware {
/** */
private static final AtomicLong ID_GEN = new AtomicLong();
@@ -77,21 +73,18 @@ public class Fragment implements RelSource {
* @param ctx Planner context.
* @param mq Metadata query used for data location calculation.
*/
- public void init(MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
+ public void init(MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) throws OptimisticPlanningException {
FragmentInfo info = IgniteMdFragmentInfo._fragmentInfo(root, mq);
- mapping = fragmentMapping(mappingService, ctx, info, mq);
+ mapping = fragmentMapping(mappingService, ctx, info);
- ImmutableList<Pair<IgniteReceiver, RelSource>> sources = info.sources();
+ if (F.isEmpty(info.targetAwareList()))
+ return;
- if (!F.isEmpty(sources)) {
- for (Pair<IgniteReceiver, RelSource> input : sources) {
- IgniteReceiver receiver = input.left;
- RelSource source = input.right;
+ RelTargetImpl target = new RelTargetImpl(id, mapping);
- source.bindToTarget(new RelTargetImpl(id, mapping, receiver.distribution()), mappingService, ctx, mq);
- }
- }
+ for (RelTargetAware aware : info.targetAwareList())
+ aware.target(target);
}
/**
@@ -101,23 +94,25 @@ public class Fragment implements RelSource {
return root;
}
- /** {@inheritDoc} */
- @Override public long fragmentId() {
+ /**
+ * @return Fragment ID.
+ */
+ public long fragmentId() {
return id;
}
- /** {@inheritDoc} */
- @Override public NodesMapping mapping() {
+ /**
+ * @return Fragment mapping.
+ */
+ public NodesMapping mapping() {
return mapping;
}
/** {@inheritDoc} */
- @Override public void bindToTarget(RelTarget target, MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
- assert !local();
-
- ((IgniteSender) root).target(target);
+ @Override public void target(RelTarget target) {
+ assert root instanceof RelTargetAware;
- init(mappingService, ctx, mq);
+ ((RelTargetAware) root).target(target);
}
/** */
@@ -126,7 +121,7 @@ public class Fragment implements RelSource {
}
/** */
- private NodesMapping fragmentMapping(MappingService mappingService, PlanningContext ctx, FragmentInfo info, RelMetadataQuery mq) {
+ private NodesMapping fragmentMapping(MappingService mappingService, PlanningContext ctx, FragmentInfo info) throws OptimisticPlanningException {
NodesMapping mapping;
try {
@@ -135,7 +130,7 @@ public class Fragment implements RelSource {
else if (local())
mapping = localMapping(ctx);
else {
- RelDistribution.Type type = IgniteMdDistribution._distribution(root, mq).getType();
+ RelDistribution.Type type = ((IgniteSender)root).sourceDistribution().getType();
boolean single = type == SINGLETON || type == BROADCAST_DISTRIBUTED;
@@ -144,7 +139,7 @@ public class Fragment implements RelSource {
}
}
catch (LocationMappingException e) {
- throw new OptimisticPlanningException("Failed to calculate physical distribution", new Edge(null, root, -1));
+ throw new OptimisticPlanningException("Failed to calculate physical distribution", root, e);
}
return mapping.deduplicate();
@@ -152,6 +147,6 @@ public class Fragment implements RelSource {
/** */
private NodesMapping localMapping(PlanningContext ctx) {
- return new NodesMapping(Collections.singletonList(ctx.localNodeId()), null, (byte) (NodesMapping.CLIENT | NodesMapping.DEDUPLICATED));
+ return new NodesMapping(Collections.singletonList(ctx.localNodeId()), null, NodesMapping.CLIENT);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
new file mode 100644
index 0000000..92b0cbe
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+
+/**
+ *
+ */
+public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
+ /** */
+ private RelMetadataQueryEx mq;
+
+ /** */
+ private List<Fragment> fragments;
+
+ /** */
+ private RelNode cutPoint;
+
+ /** */
+ public List<Fragment> go(Fragment fragment, RelNode cutPoint, RelMetadataQueryEx mq) {
+ this.cutPoint = cutPoint;
+ this.mq = mq;
+
+ fragments = new ArrayList<>();
+
+ try {
+ fragments.add(new Fragment(visit(fragment.root())));
+
+ Collections.reverse(fragments);
+
+ return fragments;
+ }
+ finally {
+ this.cutPoint = null;
+ this.mq = null;
+
+ fragments = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSender rel) {
+ // a split may happen on BiRel inputs merge. A sender node cannot be a BiRel input.
+ assert rel != cutPoint;
+
+ RelNode input = rel.getInput();
+ RelNode newInput = visit((IgniteRel) input);
+
+ if (input == newInput)
+ return rel;
+
+ return (IgniteRel) rel.copy(rel.getTraitSet(), ImmutableList.of(newInput));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteFilter rel) {
+ boolean split = rel == cutPoint;
+
+ RelNode input = rel.getInput();
+ RelNode newInput = visit((IgniteRel) input);
+
+ if (input != newInput) {
+ RelTraitSet traits = rel.getTraitSet()
+ .replace(IgniteMdDistribution.filter(mq, newInput, rel.getCondition()));
+
+ rel = (IgniteFilter) rel.copy(traits, ImmutableList.of(newInput));
+ }
+
+ return split ? split(rel, rel.getTraitSet()) : rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteProject rel) {
+ boolean split = rel == cutPoint;
+
+ RelNode input = rel.getInput();
+ RelNode newInput = visit((IgniteRel) input);
+
+ if (input != newInput) {
+ RelTraitSet traits = rel.getTraitSet()
+ .replace(IgniteMdDistribution.project(mq, newInput, rel.getProjects()));
+
+ rel = (IgniteProject) rel.copy(traits, ImmutableList.of(newInput));
+ }
+
+ return split ? split(rel, rel.getTraitSet()) : rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteJoin rel) {
+ boolean split = rel == cutPoint;
+
+ RelNode left = rel.getLeft();
+ RelNode right = rel.getRight();
+
+ RelNode newLeft = visit((IgniteRel) left);
+ RelNode newRight = visit((IgniteRel) right);
+
+ // Join requires input distribution and produces its own one.
+ // It cannot change the distribution on a child node change.
+ if (left != newLeft || right != newRight)
+ rel = (IgniteJoin) rel.copy(rel.getTraitSet(), ImmutableList.of(newLeft, newRight));
+
+ return split ? split(rel, rel.getTraitSet()) : rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableModify rel) {
+ boolean split = rel == cutPoint;
+
+ RelNode input = rel.getInput();
+ RelNode newInput = visit((IgniteRel) input);
+
+ if (input != newInput)
+ rel = (IgniteTableModify) rel.copy(rel.getTraitSet(), ImmutableList.of(newInput));
+
+ return split ? split(rel, rel.getTraitSet()) : rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableScan rel) {
+ return rel == cutPoint ? split(rel, rel.getTraitSet()) : rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReceiver rel) {
+ // a split may happen on BiRel inputs merge. A receiver doesn't have a
+ // physical mapping, so, its merge with any input cannot cause the split.
+ assert rel != cutPoint;
+
+ return rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteValues rel) {
+ // a split may happen on BiRel inputs merge. A values node doesn't have a
+ // physical mapping, so, its merge with any input cannot cause the split.
+ assert rel != cutPoint;
+
+ return rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteRel rel) {
+ return rel.accept(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteExchange rel) {
+ throw new AssertionError();
+ }
+
+ /** */
+ private IgniteRel split(IgniteRel input, RelTraitSet traits) {
+ RelOptCluster cluster = input.getCluster();
+
+ Fragment fragment = new Fragment(new IgniteSender(cluster, traits, input));
+
+ fragments.add(fragment);
+
+ return new IgniteReceiver(cluster, traits, input.getRowType(), fragment);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index b651406..397a9ef 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -118,9 +118,10 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
typeFactory = ctx.typeFactory();
catalogReader = ctx.catalogReader();
+ operatorTable = ctx.opTable();
+ conformance = ctx.conformance();
frameworkConfig = ctx.config();
- operatorTable = frameworkConfig.getOperatorTable();
programs = frameworkConfig.getPrograms();
parserConfig = frameworkConfig.getParserConfig();
sqlToRelConverterConfig = frameworkConfig.getSqlToRelConverterConfig();
@@ -129,7 +130,6 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
traitDefs = frameworkConfig.getTraitDefs();
rexBuilder = new RexBuilder(typeFactory);
- conformance = ctx.connectionConfig().conformance();
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
deleted file mode 100644
index 1f1cbf0..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-
-/**
- *
- */
-public interface RelSource {
- /**
- * @return ID of a fragment, where data comes from.
- */
- long fragmentId();
-
- /**
- * Returns source mapping. The mapping contains nodes where data comes from.
- * It's used to determine that all sources sent all related data.
- *
- * @return Source mapping.
- */
- NodesMapping mapping();
-
- /**
- * Binds a source to target and starts source data location calculation.
- * After this method call the source knows where to send data and the target knows where to expect data from.
- * @param target Target.
- * @param mappingService
- * @param ctx Context.
- * @param mq Metadata query instance.
- */
- default void bindToTarget(RelTarget target, MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
- // No-op
- }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
index 1dcb6b4..904b468 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
@@ -17,16 +17,16 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.io.Serializable;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
*
*/
-public interface RelTarget {
+public interface RelTarget extends Serializable {
/**
* @return Target fragment ID.
*/
@@ -39,9 +39,4 @@ public interface RelTarget {
* @return Target mapping.
*/
NodesMapping mapping();
-
- /**
- * @return Target distribution.
- */
- IgniteDistribution distribution();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetAware.java
similarity index 56%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetAware.java
index 8afbb01..a879b30 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetAware.java
@@ -17,35 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.io.Serializable;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-
/**
- *
+ * Describes an object, that needs information about physical target distribution.
*/
-public class RelSourceImpl implements RelSource, Serializable {
- /** */
- private final long fragmentId;
-
- /** */
- private final NodesMapping mapping;
-
+public interface RelTargetAware {
/**
- * @param fragmentId Fragment ID.
- * @param mapping Source mapping.
+ * @param target Remote target information.
*/
- public RelSourceImpl(long fragmentId, NodesMapping mapping) {
- this.fragmentId = fragmentId;
- this.mapping = mapping;
- }
-
- /** {@inheritDoc} */
- @Override public long fragmentId() {
- return fragmentId;
- }
-
- /** {@inheritDoc} */
- @Override public NodesMapping mapping() {
- return mapping;
- }
+ void target(RelTarget target);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
index 570b81f..1c27bde 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
@@ -17,32 +17,25 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.io.Serializable;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
*
*/
-public class RelTargetImpl implements RelTarget, Serializable {
+public class RelTargetImpl implements RelTarget {
/** */
private final long fragmentId;
/** */
private final NodesMapping mapping;
- /** */
- private final IgniteDistribution distribution;
-
/**
* @param fragmentId Target fragment ID.
* @param mapping Target mapping.
- * @param distribution Target distribution.
*/
- public RelTargetImpl(long fragmentId, NodesMapping mapping, IgniteDistribution distribution) {
+ public RelTargetImpl(long fragmentId, NodesMapping mapping) {
this.fragmentId = fragmentId;
this.mapping = mapping;
- this.distribution = distribution;
}
/** {@inheritDoc} */
@@ -54,9 +47,4 @@ public class RelTargetImpl implements RelTarget, Serializable {
@Override public NodesMapping mapping() {
return mapping;
}
-
- /** {@inheritDoc} */
- @Override public IgniteDistribution distribution() {
- return distribution;
- }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index b376c58..00cac1a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -108,14 +107,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
RelOptCluster cluster = rel.getCluster();
IgniteRel input = visit((IgniteRel) rel.getInput());
- RelTraitSet inTraits = input.getTraitSet();
- RelTraitSet outTraits = rel.getTraitSet();
-
- Fragment fragment = new Fragment(new IgniteSender(cluster, inTraits, input));
+ Fragment fragment = new Fragment(new IgniteSender(cluster, rel.getTraitSet(), input));
fragments.add(fragment);
- return new IgniteReceiver(cluster, outTraits, input.getRowType(), fragment);
+ return new IgniteReceiver(cluster, rel.getTraitSet(), input.getRowType(), fragment);
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 78083a1..0e474e3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
*/
public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
/** */
- private final RelSource source;
+ private final Fragment source;
/**
* Creates a Receiver
@@ -42,9 +42,9 @@ public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
* @param cluster Cluster that this relational expression belongs to
* @param traits Traits of this relational expression
* @param rowType Output row type
- * @param source Remote sources information.
+ * @param source Source fragment.
*/
- public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, RelSource source) {
+ public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Fragment source) {
super(cluster, traits);
this.rowType = rowType;
@@ -62,9 +62,9 @@ public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
}
/**
- * @return Remote sources information.
+ * @return Source fragment.
*/
- public RelSource source() {
+ public Fragment source() {
return source;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index e89bece..403f34b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -20,15 +20,20 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTargetAware;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
/**
* Relational expression that iterates over its input
* and sends elements to remote {@link IgniteReceiver}
*/
-public class IgniteSender extends SingleRel implements IgniteRel {
+public class IgniteSender extends SingleRel implements IgniteRel, RelTargetAware {
/** */
private RelTarget target;
@@ -67,6 +72,11 @@ public class IgniteSender extends SingleRel implements IgniteRel {
return visitor.visit(this);
}
+ /** {@inheritDoc} */
+ @Override public void target(RelTarget target) {
+ this.target = target;
+ }
+
/**
* @return Remote targets information.
*/
@@ -75,9 +85,23 @@ public class IgniteSender extends SingleRel implements IgniteRel {
}
/**
- * @param target Remote targets information.
+ * @return Node distribution.
*/
- public void target(RelTarget target) {
- this.target = target;
+ public IgniteDistribution targetDistribution() {
+ return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+ }
+
+ /**
+ * @return Node distribution.
+ */
+ public IgniteDistribution sourceDistribution() {
+ return input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+ }
+
+ /**
+ * @return Node collations.
+ */
+ public List<RelCollation> collations() {
+ return getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
index 55a6e24..9a79bbc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
@@ -56,7 +56,7 @@ public class JoinConverter extends IgniteConverter {
RelMetadataQuery mq = cluster.getMetadataQuery();
List<IgniteDistribution> leftTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
- List<IgniteDistribution> rightTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
+ List<IgniteDistribution> rightTraits = IgniteMdDerivedDistribution.deriveDistributions(right, IgniteConvention.INSTANCE, mq);
List<IgniteDistributions.BiSuggestion> suggestions = IgniteDistributions.suggestJoin(leftTraits, rightTraits, join.analyzeCondition(), join.getJoinType());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
index c11b793..04a32af 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
@@ -64,12 +64,12 @@ public class RelToPhysicalConverter implements IgniteRelVisitor<PhysicalRel> {
/** {@inheritDoc} */
@Override public PhysicalRel visit(IgniteSender rel) {
- long targetFragmentId = rel.target().fragmentId();
- NodesMapping targetMapping = rel.target().mapping();
- DistributionFunction targetFunction = rel.target().distribution().function();
- ImmutableIntList distributionKeys = rel.target().distribution().getKeys();
+ long fragmentId = rel.target().fragmentId();
+ NodesMapping mapping = rel.target().mapping();
+ DistributionFunction fun = rel.targetDistribution().function();
+ ImmutableIntList keys = rel.targetDistribution().getKeys();
- return new SenderPhysicalRel(targetFragmentId, targetMapping, targetFunction, distributionKeys, visit((IgniteRel) rel.getInput()));
+ return new SenderPhysicalRel(fragmentId, mapping, fun, keys, visit((IgniteRel) rel.getInput()));
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index 5ee9192..085ce43 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -59,6 +59,18 @@ public abstract class DistributionFunction implements Serializable {
public abstract Destination destination(PartitionService partitionService, NodesMapping mapping, ImmutableIntList keys);
/**
+ * Creates a partition.
+ *
+ * @param partitionService Affinity function source.
+ * @param partitionsCount Expected partitions count.
+ * @param keys Distribution keys.
+ * @return Partition function.
+ */
+ public ToIntFunction<Object> partitionFunction(PartitionService partitionService, int partitionsCount, ImmutableIntList keys) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* @return Function name. This name used for equality checking and in {@link RelNode#getDigest()}.
*/
public final String name(){
@@ -208,10 +220,12 @@ public abstract class DistributionFunction implements Serializable {
assert F.isEmpty(assignment) || assignment.size() == 1;
}
- ToIntFunction<Object> rowToPart = DistributionFunction.rowToPart(
- partitionService.partitionFunction(CU.UNDEFINED_CACHE_ID), k.toIntArray());
+ return new Partitioned(m.nodes(), assignments, partitionFunction(partitionService, assignments.size(), k));
+ }
- return new Partitioned(m.nodes(), assignments, rowToPart);
+ /** {@inheritDoc} */
+ @Override public ToIntFunction<Object> partitionFunction(PartitionService partitionService, int partitionsCount, ImmutableIntList k) {
+ return DistributionFunction.rowToPart(partitionService.partitionFunction(CU.UNDEFINED_CACHE_ID), partitionsCount, k.toIntArray());
}
/** */
@@ -253,10 +267,12 @@ public abstract class DistributionFunction implements Serializable {
assert F.isEmpty(assignment) || assignment.size() == 1;
}
- ToIntFunction<Object> rowToPart = DistributionFunction.rowToPart(
- partitionService.partitionFunction(cacheId), k.toIntArray());
+ return new Partitioned(m.nodes(), assignments, partitionFunction(partitionService, assignments.size(), k));
+ }
- return new Partitioned(m.nodes(), assignments, rowToPart);
+ /** {@inheritDoc} */
+ @Override public ToIntFunction<Object> partitionFunction(PartitionService partitionService, int partitionsCount, ImmutableIntList k) {
+ return DistributionFunction.rowToPart(partitionService.partitionFunction(cacheId), partitionsCount, k.toIntArray());
}
/** {@inheritDoc} */
@@ -266,7 +282,7 @@ public abstract class DistributionFunction implements Serializable {
}
/** */
- private static ToIntFunction<Object> rowToPart(ToIntFunction<Object> keyToPart, int[] keys) {
+ private static ToIntFunction<Object> rowToPart(ToIntFunction<Object> keyToPart, int parts, int[] keys) {
return r -> {
Object[] row = (Object[]) r;
@@ -278,7 +294,7 @@ public abstract class DistributionFunction implements Serializable {
for (int i = 1; i < keys.length; i++)
hash = 31 * hash + keyToPart.applyAsInt(row[keys[i]]);
- return hash;
+ return U.safeAbs(hash) % parts;
};
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index e33de68..b97794b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -20,10 +20,8 @@ package org.apache.ignite.internal.processors.query.calcite.trait;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Exchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
/**
@@ -33,14 +31,17 @@ public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
/** */
public static final DistributionTraitDef INSTANCE = new DistributionTraitDef();
+ /** {@inheritDoc} */
@Override public Class<IgniteDistribution> getTraitClass() {
return IgniteDistribution.class;
}
+ /** {@inheritDoc} */
@Override public String getSimpleName() {
return "distr";
}
+ /** {@inheritDoc} */
@Override public RelNode convert(RelOptPlanner planner, RelNode rel, IgniteDistribution targetDist, boolean allowInfiniteCostConverters) {
if (rel.getConvention() == Convention.NONE)
return null;
@@ -54,14 +55,8 @@ public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
case HASH_DISTRIBUTED:
case BROADCAST_DISTRIBUTED:
case SINGLETON:
- Exchange exchange = new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel, targetDist);
- RelNode newRel = planner.register(exchange, rel);
- RelTraitSet newTraits = rel.getTraitSet().replace(targetDist);
-
- if (!newRel.getTraitSet().equals(newTraits))
- newRel = planner.changeTraits(newRel, newTraits);
-
- return newRel;
+ return register(planner, rel,
+ new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel, targetDist));
case ANY:
return rel;
default:
@@ -69,11 +64,23 @@ public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
}
}
+ /** {@inheritDoc} */
@Override public boolean canConvert(RelOptPlanner planner, IgniteDistribution fromTrait, IgniteDistribution toTrait) {
return true;
}
+ /** {@inheritDoc} */
@Override public IgniteDistribution getDefault() {
return IgniteDistributions.any();
}
+
+ /** */
+ private RelNode register(RelOptPlanner planner, RelNode rel, RelNode replace) {
+ RelNode registered = planner.register(replace, rel);
+
+ if (!registered.getTraitSet().equals(replace.getTraitSet()))
+ registered = planner.changeTraits(registered, replace.getTraitSet());
+
+ return registered;
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 5855442..4fe2777 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
@@ -218,10 +219,10 @@ public class IgniteDistributions {
if (joinType == LEFT || joinType == RIGHT || (joinType == INNER && !F.isEmpty(joinInfo.pairs()))) {
HashSet<DistributionFunction> factories = U.newHashSet(3);
- if (leftIn.getKeys().equals(joinInfo.leftKeys))
+ if (Objects.equals(joinInfo.leftKeys, leftIn.getKeys()))
factories.add(leftIn.function());
- if (rightIn.getKeys().equals(joinInfo.rightKeys))
+ if (Objects.equals(joinInfo.rightKeys, rightIn.getKeys()))
factories.add(rightIn.function());
factories.add(HashDistribution.INSTANCE);
@@ -258,10 +259,10 @@ public class IgniteDistributions {
IgniteDistribution newLeft, IgniteDistribution newRight) {
int exch = 0;
- if (!left.satisfies(newLeft))
+ if (needsExchange(left, newLeft))
exch++;
- if (!right.satisfies(newRight))
+ if (needsExchange(right, newRight))
exch++;
dst.add(new BiSuggestion(out, newLeft, newRight, exch));
@@ -270,6 +271,11 @@ public class IgniteDistributions {
}
/** */
+ private static boolean needsExchange(IgniteDistribution sourceDist, IgniteDistribution targetDist) {
+ return !sourceDist.satisfies(targetDist);
+ }
+
+ /** */
@SuppressWarnings("SameParameterValue")
private static List<BiSuggestion> topN(ArrayList<BiSuggestion> src, int n) {
Collections.sort(src);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java
index cd806ba..3fd0aed 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java
@@ -41,7 +41,7 @@ public final class Partitioned implements Destination {
/** {@inheritDoc} */
@Override public List<UUID> targets(Object row) {
- return assignments.get(partFun.applyAsInt(row) % assignments.size());
+ return assignments.get(partFun.applyAsInt(row));
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index c6b07e5..01c63b6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.linq4j.tree.Primitive;
@@ -32,6 +33,7 @@ import org.apache.calcite.plan.Contexts;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
@@ -65,18 +67,22 @@ public final class Commons {
public static <T> List<T> intersect(List<T> left, List<T> right) {
if (F.isEmpty(left) || F.isEmpty(right))
return Collections.emptyList();
- else if (left.size() > right.size())
- return intersect0(right, left);
- else
- return intersect0(left, right);
+
+ return intersect0(left, right);
}
- /** */
- private static <T> List<T> intersect0(List<T> left, List<T> right) {
- List<T> res = new ArrayList<>(Math.min(left.size(), right.size()));
- HashSet<T> set = new HashSet<>(left);
+ /**
+ * Intersects a set and a list.
+ *
+ * @return A List of unique entries that presented in both the given set and the given list.
+ */
+ public static <T> List<T> intersect(Set<T> set, List<T> list) {
+ if (F.isEmpty(set) || F.isEmpty(list))
+ return Collections.emptyList();
- for (T t : right) {
+ List<T> res = new ArrayList<>(Math.min(set.size(), list.size()));
+
+ for (T t : list) {
if (set.contains(t))
res.add(t);
}
@@ -84,6 +90,14 @@ public final class Commons {
return res;
}
+ /** */
+ private static <T> List<T> intersect0(@NotNull List<T> left, @NotNull List<T> right) {
+ if (left.size() > right.size())
+ return intersect0(right, left);
+
+ return intersect(new HashSet<>(left), right);
+ }
+
/**
* Returns a given list as a typed list.
*/
@@ -201,10 +215,21 @@ public final class Commons {
/** */
public static RelDataType combinedRowType(IgniteTypeFactory typeFactory, RelDataType... types) {
- final RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+ RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+ Set<String> names = new HashSet<>();
- for (RelDataType type : types)
- builder.addAll(type.getFieldList());
+ for (RelDataType type : types) {
+ for (RelDataTypeField field : type.getFieldList()) {
+ int idx = 0;
+ String fieldName = field.getName();
+
+ while (!names.add(fieldName))
+ fieldName = field.getName() + idx++;
+
+ builder.add(fieldName, field.getType());
+ }
+ }
return builder.build();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
index 118b5c0..2fbf47a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
@@ -195,7 +195,7 @@ public class TableScan implements Iterable<Object[]> {
private void reserveReplicated(GridDhtPartitionTopology top) {
List<GridDhtLocalPartition> localParts = top.localPartitions();
- parts = new ArrayDeque<>(localParts);
+ parts = new ArrayDeque<>(localParts.size());
for (GridDhtLocalPartition local : localParts) {
if (!local.reserve())
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index c54eec9..5af47fb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -93,6 +94,44 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
}
@Test
+ public void query2() throws Exception {
+ IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
+ .setName("developer")
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Developer.class)
+ .setCacheMode(CacheMode.REPLICATED)
+ );
+
+ IgniteCache<Integer, Project> project = ignite.getOrCreateCache(new CacheConfiguration<Integer, Project>()
+ .setName("project")
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Project.class)
+ .setBackups(2)
+ );
+
+ waitForReadyTopology(internalCache(project).context().topology(), new AffinityTopologyVersion(5, 3));
+
+ project.putAll(ImmutableMap.of(
+ 0, new Project("Ignite"),
+ 1, new Project("Calcite")
+ ));
+
+ developer.putAll(ImmutableMap.of(
+ 0, new Developer("Igor", 1),
+ 1, new Developer("Roman", 0)
+ ));
+
+ QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+ "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?", 0);
+
+ assertEquals(1, query.size());
+
+ assertEqualsCollections(Arrays.asList("Igor", 1, "Calcite"), F.first(query.get(0).getAll()));
+ }
+
+ @Test
public void queryMultiStatement() throws Exception {
IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
.setName("developer")