You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/03/16 22:00:22 UTC

[ignite] branch ignite-12248 updated: IGNITE-12776: Calcite integration. An exception occurred when a replicated cache is used in a query. #7519

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 47a2da9  IGNITE-12776: Calcite integration. An exception occurred when a replicated cache is used in a query. #7519
47a2da9 is described below

commit 47a2da98851050ae7c64797cebb30bb9529d7180
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Mar 17 01:00:07 2020 +0300

    IGNITE-12776: Calcite integration. An exception occurred when a replicated cache is used in a query. #7519
---
 .../query/calcite/exec/LogicalRelImplementor.java  |   9 +-
 .../query/calcite/metadata/FragmentInfo.java       |  28 ++-
 .../calcite/metadata/IgniteMdFragmentInfo.java     |  34 ++--
 .../query/calcite/metadata/MappingServiceImpl.java |  17 +-
 .../query/calcite/metadata/NodesMapping.java       | 128 ++++++-------
 .../metadata/OptimisticPlanningException.java      |  26 +--
 .../calcite/metadata/PartitionServiceImpl.java     |   3 +-
 .../calcite/prepare/AbstractMultiStepPlan.java     |  53 ++----
 .../processors/query/calcite/prepare/Edge.java     |  66 -------
 .../processors/query/calcite/prepare/Fragment.java |  51 +++---
 .../query/calcite/prepare/FragmentSplitter.java    | 198 +++++++++++++++++++++
 .../query/calcite/prepare/IgnitePlanner.java       |   4 +-
 .../query/calcite/prepare/RelSource.java           |  52 ------
 .../query/calcite/prepare/RelTarget.java           |   9 +-
 .../{RelSourceImpl.java => RelTargetAware.java}    |  31 +---
 .../query/calcite/prepare/RelTargetImpl.java       |  16 +-
 .../processors/query/calcite/prepare/Splitter.java |   8 +-
 .../query/calcite/rel/IgniteReceiver.java          |  12 +-
 .../processors/query/calcite/rel/IgniteSender.java |  32 +++-
 .../query/calcite/rule/JoinConverter.java          |   2 +-
 .../calcite/serialize/RelToPhysicalConverter.java  |  10 +-
 .../query/calcite/trait/DistributionFunction.java  |  32 +++-
 .../query/calcite/trait/DistributionTraitDef.java  |  27 +--
 .../query/calcite/trait/IgniteDistributions.java   |  14 +-
 .../query/calcite/trait/Partitioned.java           |   2 +-
 .../processors/query/calcite/util/Commons.java     |  49 +++--
 .../processors/query/calcite/util/TableScan.java   |   2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  39 ++++
 28 files changed, 526 insertions(+), 428 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 49b92ef..baea3f2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
 import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
-import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -97,12 +97,11 @@ public class LogicalRelImplementor implements IgniteRelVisitor<Node<Object[]>> {
     /** {@inheritDoc} */
     @Override public Node<Object[]> visit(IgniteSender rel) {
         RelTarget target = rel.target();
-        long targetFragmentId = target.fragmentId();
-        IgniteDistribution distribution = target.distribution();
+        IgniteDistribution distribution = rel.targetDistribution();
         Destination destination = distribution.function().destination(partitionService, target.mapping(), distribution.getKeys());
 
         // Outbox fragment ID is used as exchange ID as well.
-        Outbox<Object[]> outbox = new Outbox<>(ctx, exchangeService, mailboxRegistry, ctx.fragmentId(), targetFragmentId, destination);
+        Outbox<Object[]> outbox = new Outbox<>(ctx, exchangeService, mailboxRegistry, ctx.fragmentId(), target.fragmentId(), destination);
         outbox.register(visit(rel.getInput()));
 
         mailboxRegistry.register(outbox);
@@ -167,7 +166,7 @@ public class LogicalRelImplementor implements IgniteRelVisitor<Node<Object[]>> {
 
     /** {@inheritDoc} */
     @Override public Node<Object[]> visit(IgniteReceiver rel) {
-        RelSource source = rel.source();
+        Fragment source = rel.source();
 
         // Corresponding outbox fragment ID is used as exchange ID as well.
         Inbox<Object[]> inbox = (Inbox<Object[]>) mailboxRegistry.register(new Inbox<>(ctx, exchangeService, mailboxRegistry, source.fragmentId(), source.fragmentId()));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index e787d66..c9bc75a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -18,10 +18,8 @@
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTargetAware;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -33,7 +31,7 @@ public class FragmentInfo {
     private final NodesMapping mapping;
 
     /** */
-    private final ImmutableList<Pair<IgniteReceiver, RelSource>> sources;
+    private final ImmutableList<RelTargetAware> targetAwareList;
 
     /**
      * Constructs Values leaf fragment info.
@@ -45,10 +43,10 @@ public class FragmentInfo {
     /**
      * Constructs Receiver leaf fragment info.
      *
-     * @param source Pair of a Receiver relational node and its data source information.
+     * @param targetAware A node that needs target information.
      */
-    public FragmentInfo(Pair<IgniteReceiver, RelSource> source) {
-        this(ImmutableList.of(source), null);
+    public FragmentInfo(RelTargetAware targetAware) {
+        this(ImmutableList.of(targetAware), null);
     }
 
     /**
@@ -63,11 +61,11 @@ public class FragmentInfo {
     /**
      * Used on merge of two relational node tree edges.
      *
-     * @param sources Pairs of underlying Receiver relational nodes and theirs data source information.
+     * @param targetAwareList nodes that need target information.
      * @param mapping Nodes mapping, describing where interested data placed.
      */
-    public FragmentInfo(ImmutableList<Pair<IgniteReceiver, RelSource>> sources, NodesMapping mapping) {
-        this.sources = sources;
+    public FragmentInfo(ImmutableList<RelTargetAware> targetAwareList, NodesMapping mapping) {
+        this.targetAwareList = targetAwareList;
         this.mapping = mapping;
     }
 
@@ -86,10 +84,10 @@ public class FragmentInfo {
     }
 
     /**
-     * @return Pairs of underlying Receiver relational nodes and theirs data source information.
+     * @return Nodes that need target information..
      */
-    public ImmutableList<Pair<IgniteReceiver, RelSource>> sources() {
-        return sources;
+    public ImmutableList<RelTargetAware> targetAwareList() {
+        return targetAwareList;
     }
 
     /**
@@ -100,7 +98,7 @@ public class FragmentInfo {
      */
     public FragmentInfo merge(FragmentInfo other) throws LocationMappingException {
         return new FragmentInfo(
-            merge(sources(), other.sources()),
+            merge(targetAwareList(), other.targetAwareList()),
             merge(mapping(), other.mapping()));
     }
 
@@ -115,7 +113,7 @@ public class FragmentInfo {
             NodesMapping newMapping = mapping.prune(filter);
 
             if (newMapping != mapping)
-                return new FragmentInfo(sources, newMapping);
+                return new FragmentInfo(targetAwareList, newMapping);
         }
 
         return this;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 26d5212..23bce2c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.core.Join;
@@ -29,9 +28,7 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
-import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -110,19 +107,22 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
             return left.merge(right);
         }
         catch (LocationMappingException e) {
+            String msg = "Failed to calculate physical distribution";
+
             // a replicated cache is cheaper to redistribute
             if (!left.mapping().hasPartitionedCaches())
-                throw planningException(rel, e, true);
+                throw new OptimisticPlanningException(msg, rel.getLeft(), e);
             else if (!right.mapping().hasPartitionedCaches())
-                throw planningException(rel, e, false);
-
-            // both sub-trees have partitioned sources, less cost is better
-            RelOptCluster cluster = rel.getCluster();
+                throw new OptimisticPlanningException(msg, rel.getRight(), e);
+            else {
+                // both sub-trees have partitioned sources, less cost is better
+                RelOptCluster cluster = rel.getCluster();
 
-            RelOptCost leftCost = rel.getLeft().computeSelfCost(cluster.getPlanner(), mq);
-            RelOptCost rightCost = rel.getRight().computeSelfCost(cluster.getPlanner(), mq);
+                RelOptCost leftCost = rel.getLeft().computeSelfCost(cluster.getPlanner(), mq);
+                RelOptCost rightCost = rel.getRight().computeSelfCost(cluster.getPlanner(), mq);
 
-            throw planningException(rel, e, leftCost.isLe(rightCost));
+                throw new OptimisticPlanningException(msg, leftCost.isLe(rightCost) ? rel.getLeft() : rel.getRight(), e);
+            }
         }
     }
 
@@ -130,7 +130,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
      * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      */
     public FragmentInfo fragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
-        return new FragmentInfo(Pair.of(rel, rel.source()));
+        return new FragmentInfo(rel.source());
     }
 
     /**
@@ -156,14 +156,4 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
     public static FragmentInfo _fragmentInfo(RelNode rel, RelMetadataQuery mq) {
         return RelMetadataQueryEx.wrap(mq).getFragmentInfo(rel);
     }
-
-    /** */
-    private static OptimisticPlanningException planningException(BiRel rel, Exception cause, boolean splitLeft) {
-        String msg = "Failed to calculate physical distribution";
-
-        if (splitLeft)
-            return new OptimisticPlanningException(msg, new Edge(rel, rel.getLeft(), 0), cause);
-
-        return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java
index f790c63..25c38dd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingServiceImpl.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -30,12 +32,15 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED;
-
 /**
  *
  */
 public class MappingServiceImpl extends AbstractService implements MappingService {
+    /**
+     * Max nodes count, used on to-hash or to-random redistribution.
+     */
+    private static final int MAX_BUCKETS_COUNT = IgniteSystemProperties.getInteger("IGNITE_CALCITE_MAX_BUCKETS_COUNT", 1024);
+
     /** */
     private GridDiscoveryManager discoveryManager;
 
@@ -62,17 +67,19 @@ public class MappingServiceImpl extends AbstractService implements MappingServic
     @Override public NodesMapping mapBalanced(@NotNull AffinityTopologyVersion topVer, int desiredCnt, @Nullable Predicate<ClusterNode> nodeFilter) {
         assert desiredCnt >= 0;
 
-        List<ClusterNode> nodes = discoveryManager.discoCache(topVer).serverNodes();
+        desiredCnt = desiredCnt == 0 ? MAX_BUCKETS_COUNT : Math.min(desiredCnt, MAX_BUCKETS_COUNT);
+
+        List<ClusterNode> nodes = new ArrayList<>(discoveryManager.discoCache(topVer).serverNodes());
 
         if (nodeFilter != null)
             nodes = nodes.stream().filter(nodeFilter).collect(Collectors.toList());
 
-        if (desiredCnt != 0 && desiredCnt < nodes.size()) {
+        if (desiredCnt < nodes.size()) {
             Collections.shuffle(nodes);
 
             nodes = nodes.subList(0, desiredCnt);
         }
 
-        return new NodesMapping(Commons.transform(nodes, ClusterNode::id), null, DEDUPLICATED);
+        return new NodesMapping(Commons.transform(nodes, ClusterNode::id), null, (byte) 0);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index cd28a28..3bb89b0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -19,12 +19,10 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -64,6 +62,7 @@ public class NodesMapping implements Serializable {
     /** */
     private final byte flags;
 
+    /** */
     public NodesMapping(List<UUID> nodes, List<List<UUID>> assignments, byte flags) {
         this.nodes = nodes;
         this.assignments = assignments;
@@ -105,22 +104,19 @@ public class NodesMapping implements Serializable {
     public NodesMapping mergeWith(NodesMapping other) throws LocationMappingException {
         byte flags = (byte) (this.flags | other.flags);
 
-        if ((flags & (PARTIALLY_REPLICATED | CLIENT)) == 0)
-            return new NodesMapping(U.firstNotNull(nodes, other.nodes), mergeAssignments(other, null), flags);
-
-        List<UUID> nodes;
+        List<UUID> nodes = intersectReplicated(this.nodes, other.nodes);
 
-        if (this.nodes == null)
-            nodes = other.nodes;
-        else if (other.nodes == null)
-            nodes = this.nodes;
-        else
-            nodes = Commons.intersect(this.nodes, other.nodes);
+        // if there is no moving partitions both assignments are identical.
+        List<List<UUID>> assignments = (flags & HAS_MOVING_PARTITIONS) == 0
+            ? U.firstNotNull(this.assignments, other.assignments)
+            : intersectPartitioned(this.assignments, other.assignments);
 
-        if (nodes != null && nodes.isEmpty())
-            throw new LocationMappingException("Failed to map fragment to location.");
+        // In case all involved replicated caches are available on
+        // all nodes it's no need to check assignments against them.
+        if ((flags & (PARTIALLY_REPLICATED | CLIENT)) == 0)
+            return new NodesMapping(nodes, assignments, flags);
 
-        return new NodesMapping(nodes, mergeAssignments(other, nodes), flags);
+        return new NodesMapping(nodes, intersectReplicatedPartitioned(nodes, assignments), flags);
     }
 
     /**
@@ -132,31 +128,32 @@ public class NodesMapping implements Serializable {
      * @return Nodes mapping, containing nodes, that actually will be in charge of query execution.
      */
     public NodesMapping deduplicate() {
-        if (!excessive())
+        if ((flags & DEDUPLICATED) == DEDUPLICATED)
             return this;
 
         if (assignments == null) {
-            UUID node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+            List<List<UUID>> assignments0 = new ArrayList<>(nodes.size());
+
+            for (UUID node : nodes)
+                assignments0.add(F.asList(node));
 
-            return new NodesMapping(Collections.singletonList(node), null, (byte)(flags | DEDUPLICATED));
+            return new NodesMapping(nodes, assignments0, (byte)(flags | DEDUPLICATED));
         }
 
-        HashSet<UUID> nodes0 = new HashSet<>();
+        HashSet<UUID> nodesSet = new HashSet<>();
+        ArrayList<UUID> nodes0 = new ArrayList<>();
         List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
 
         for (List<UUID> partNodes : assignments) {
             UUID node = F.first(partNodes);
 
-            if (node == null)
-                assignments0.add(Collections.emptyList());
-            else {
-                assignments0.add(Collections.singletonList(node));
+            assignments0.add(F.asList(node));
 
+            if (node != null && nodesSet.add(node))
                 nodes0.add(node);
-            }
         }
 
-        return new NodesMapping(new ArrayList<>(nodes0), assignments0, (byte)(flags | DEDUPLICATED));
+        return new NodesMapping(nodes0, assignments0, (byte)(flags | DEDUPLICATED));
     }
 
     /**
@@ -166,8 +163,7 @@ public class NodesMapping implements Serializable {
      * @return List of partitions to scan on the given node.
      */
     public int[] partitions(UUID node) {
-        if (assignments == null)
-            return null;
+        assert (flags & DEDUPLICATED) == DEDUPLICATED;
 
         GridIntList parts = new GridIntList(assignments.size());
 
@@ -181,13 +177,6 @@ public class NodesMapping implements Serializable {
     }
 
     /**
-     * @return {@code True} if mapping is excessive.
-     */
-    public boolean excessive() {
-        return (flags & DEDUPLICATED) == 0;
-    }
-
-    /**
      * @return {@code True} if some of involved partitioned tables are being rebalanced.
      */
     public boolean hasMovingPartitions() {
@@ -218,59 +207,56 @@ public class NodesMapping implements Serializable {
     }
 
     /** */
-    private List<List<UUID>> mergeAssignments(NodesMapping other, List<UUID> nodes) throws LocationMappingException {
-        byte flags = (byte) (this.flags | other.flags);
-        List<List<UUID>> left = assignments;
-        List<List<UUID>> right = other.assignments;
+    private List<UUID> intersectReplicated(List<UUID> left, List<UUID> right) throws LocationMappingException {
+        if (left == null || right == null)
+            return U.firstNotNull(left, right);
 
-        if (left == null && right == null)
-            return null; // nothing to intersect;
+        List<UUID> nodes = Commons.intersect(right, left);
 
-        if (left == null || right == null || (flags & HAS_MOVING_PARTITIONS) == 0) {
-            List<List<UUID>> assignments = U.firstNotNull(left, right);
+        if (F.isEmpty(nodes)) // replicated caches aren't co-located on all nodes.
+            throw new LocationMappingException("Failed to map fragment to location. Replicated query parts are not co-located on all nodes");
 
-            if (nodes == null)
-                return assignments;
+        return nodes;
+    }
 
-            List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
-            HashSet<UUID> nodesSet = new HashSet<>(nodes);
+    /** */
+    private List<List<UUID>> intersectPartitioned(List<List<UUID>> left, List<List<UUID>> right) throws LocationMappingException {
+        if (left == null || right == null)
+            return U.firstNotNull(left, right);
 
-            for (List<UUID> partNodes : assignments) {
-                List<UUID> partNodes0 = new ArrayList<>(partNodes.size());
+        assert left.size() == right.size();
 
-                for (UUID partNode : partNodes) {
-                    if (nodesSet.contains(partNode))
-                        partNodes0.add(partNode);
-                }
+        List<List<UUID>> res = new ArrayList<>(left.size());
 
-                if (partNodes0.isEmpty()) // TODO check with partition filters
-                    throw new LocationMappingException("Failed to map fragment to location.");
+        for (int i = 0; i < left.size(); i++) {
+            List<UUID> partNodes = Commons.intersect(left.get(i), right.get(i));
 
-                assignments0.add(partNodes0);
-            }
+            if (partNodes.isEmpty()) // TODO check with partition filters
+                throw new LocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
 
-            return assignments0;
+            res.add(partNodes);
         }
 
-        List<List<UUID>> assignments = new ArrayList<>(left.size());
-        HashSet<UUID> nodesSet = nodes != null ? new HashSet<>(nodes) : null;
+        return res;
+    }
+
+    /** */
+    private List<List<UUID>> intersectReplicatedPartitioned(List<UUID> nodes, List<List<UUID>> assignments) throws LocationMappingException {
+        if (nodes == null || assignments == null)
+            return assignments;
 
-        for (int i = 0; i < left.size(); i++) {
-            List<UUID> leftNodes = left.get(i);
-            List<UUID> partNodes = new ArrayList<>(leftNodes.size());
-            HashSet<UUID> rightNodesSet = new HashSet<>(right.get(i));
+        HashSet<UUID> nodesSet = new HashSet<>(nodes);
+        List<List<UUID>> res = new ArrayList<>(assignments.size());
 
-            for (UUID partNode : leftNodes) {
-                if (rightNodesSet.contains(partNode) && (nodesSet == null || nodesSet.contains(partNode)))
-                    partNodes.add(partNode);
-            }
+        for (int i = 0; i < assignments.size(); i++) {
+            List<UUID> partNodes = Commons.intersect(nodesSet, assignments.get(i));
 
-            if (partNodes.isEmpty())
-                throw new LocationMappingException("Failed to map fragment to location.");
+            if (partNodes.isEmpty()) // TODO check with partition filters
+                throw new LocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
 
-            assignments.add(partNodes);
+            res.add(partNodes);
         }
 
-        return assignments;
+        return res;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index 8857b45..25dcc1f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -17,40 +17,30 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
+import org.apache.calcite.rel.RelNode;
 
 /**
  *
  */
 public class OptimisticPlanningException extends RuntimeException{
     /** */
-    private final Edge edge;
+    private final RelNode node;
 
     /**
      *
      * @param message Message.
-     * @param edge Edge of query plan, where the exception was thrown.
+     * @param node Node of a query plan, where the exception was thrown.
      * @param cause Cause.
      */
-    public OptimisticPlanningException(String message, Edge edge, Throwable cause) {
+    public OptimisticPlanningException(String message, RelNode node, Throwable cause) {
         super(message, cause);
-        this.edge = edge;
+        this.node = node;
     }
 
     /**
-     *
-     * @param message Message.
-     * @param edge Edge of query plan, where the exception was thrown.
-     */
-    public OptimisticPlanningException(String message, Edge edge) {
-        super(message);
-        this.edge = edge;
-    }
-
-    /**
-     * @return Edge of query plan, where the exception was thrown.
+     * @return Node of a query plan, where the exception was thrown.
      */
-    public Edge edge() {
-        return edge;
+    public RelNode node() {
+        return node;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java
index 4c01d8d..ed85c4e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/PartitionServiceImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
@@ -53,7 +54,7 @@ public class PartitionServiceImpl extends AbstractService implements PartitionSe
     /** {@inheritDoc} */
     @Override public ToIntFunction<Object> partitionFunction(int cacheId) {
         if (cacheId == CU.UNDEFINED_CACHE_ID)
-            return k -> k == null ? 0 : k.hashCode();
+            return k -> k == null ? 0 : U.safeAbs(k.hashCode());
 
         AffinityFunction affinity = cacheSharedContext.cacheContext(cacheId).group().affinityFunction();
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 27f1963..99f316b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -18,16 +18,11 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -57,48 +52,32 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
 
     /** {@inheritDoc} */
     @Override public void init(MappingService mappingService, PlanningContext ctx) {
-        int i = 0;
-
         RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
 
-        while (true) {
+        for (int i = 0, j = 0; i < fragments.size();) {
+            Fragment fragment = fragments.get(i);
+
             try {
-                F.first(fragments).init(mappingService, ctx, mq);
+                fragment.init(mappingService, ctx, mq);
 
-                break;
+                i++;
             }
             catch (OptimisticPlanningException e) {
-                if (++i > 3)
+                if (++j > 3)
                     throw new IgniteSQLException("Failed to map query.", e);
 
-                Edge edge = e.edge();
-
-                RelNode parent = edge.parent();
-                RelNode child = edge.child();
-
-                RelOptCluster cluster = child.getCluster();
-                RelTraitSet traitSet = child.getTraitSet();
-
-                Fragment fragment = new Fragment(new IgniteSender(cluster, traitSet, child));
-
-                fragments.add(fragment);
-
-                if (parent != null)
-                    parent.replaceInput(edge.childIndex(), new IgniteReceiver(cluster, traitSet, child.getRowType(), fragment));
-                else {
-                    // need to fix a distribution of a root of a fragment
-                    int idx = 0;
+                replace(fragment, new FragmentSplitter().go(fragment, e.node(), mq));
 
-                    for (; idx < fragments.size(); idx++) {
-                        if (fragments.get(idx).root() == child)
-                            break;
-                    }
-
-                    assert idx < fragments.size();
-
-                    fragments.set(idx, new Fragment(new IgniteReceiver(cluster, traitSet, child.getRowType(), fragment)));
-                }
+                i = 0; // restart init routine.
             }
         }
     }
+
+    /** */
+    private void replace(Fragment fragment, List<Fragment> replacement) {
+        assert !F.isEmpty(replacement);
+
+        fragments.set(fragments.indexOf(fragment), F.first(replacement));
+        fragments.addAll(replacement.subList(1, replacement.size()));
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
deleted file mode 100644
index fe0f87b..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-
-import org.apache.calcite.rel.RelNode;
-
-/**
- * Describes RelNode tree edge.
- */
-public class Edge {
-    /** */
-    private final RelNode parent;
-
-    /** */
-    private final RelNode child;
-
-    /** */
-    private final int childIdx;
-
-    /**
-     * @param parent Parent node.
-     * @param child Child node.
-     * @param childIdx Child node index in parent.
-     */
-    public Edge(RelNode parent, RelNode child, int childIdx) {
-        this.parent = parent;
-        this.child = child;
-        this.childIdx = childIdx;
-    }
-
-    /**
-     * @return Parent node.
-     */
-    public RelNode parent() {
-        return parent;
-    }
-
-    /**
-     * @return Child node.
-     */
-    public RelNode child() {
-        return child;
-    }
-
-    /**
-     * @return Child node index in parent.
-     */
-    public int childIndex() {
-        return childIdx;
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index b9ef7c3..43251b1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -17,20 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import com.google.common.collect.ImmutableList;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.util.typedef.F;
@@ -41,7 +37,7 @@ import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
 /**
  * Fragment of distributed query
  */
-public class Fragment implements RelSource {
+public class Fragment implements RelTargetAware {
     /** */
     private static final AtomicLong ID_GEN = new AtomicLong();
 
@@ -77,21 +73,18 @@ public class Fragment implements RelSource {
      * @param ctx Planner context.
      * @param mq Metadata query used for data location calculation.
      */
-    public void init(MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
+    public void init(MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) throws OptimisticPlanningException {
         FragmentInfo info = IgniteMdFragmentInfo._fragmentInfo(root, mq);
 
-        mapping = fragmentMapping(mappingService, ctx, info, mq);
+        mapping = fragmentMapping(mappingService, ctx, info);
 
-        ImmutableList<Pair<IgniteReceiver, RelSource>> sources = info.sources();
+        if (F.isEmpty(info.targetAwareList()))
+            return;
 
-        if (!F.isEmpty(sources)) {
-            for (Pair<IgniteReceiver, RelSource> input : sources) {
-                IgniteReceiver receiver = input.left;
-                RelSource source = input.right;
+        RelTargetImpl target = new RelTargetImpl(id, mapping);
 
-                source.bindToTarget(new RelTargetImpl(id, mapping, receiver.distribution()), mappingService, ctx, mq);
-            }
-        }
+        for (RelTargetAware aware : info.targetAwareList())
+            aware.target(target);
     }
 
     /**
@@ -101,23 +94,25 @@ public class Fragment implements RelSource {
         return root;
     }
 
-    /** {@inheritDoc} */
-    @Override public long fragmentId() {
+    /**
+     * @return Fragment ID.
+     */
+    public long fragmentId() {
         return id;
     }
 
-    /** {@inheritDoc} */
-    @Override public NodesMapping mapping() {
+    /**
+     * @return Fragment mapping.
+     */
+    public NodesMapping mapping() {
         return mapping;
     }
 
     /** {@inheritDoc} */
-    @Override public void bindToTarget(RelTarget target, MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
-        assert !local();
-
-        ((IgniteSender) root).target(target);
+    @Override public void target(RelTarget target) {
+        assert root instanceof RelTargetAware;
 
-        init(mappingService, ctx, mq);
+        ((RelTargetAware) root).target(target);
     }
 
     /** */
@@ -126,7 +121,7 @@ public class Fragment implements RelSource {
     }
 
     /** */
-    private NodesMapping fragmentMapping(MappingService mappingService, PlanningContext ctx, FragmentInfo info, RelMetadataQuery mq) {
+    private NodesMapping fragmentMapping(MappingService mappingService, PlanningContext ctx, FragmentInfo info) throws OptimisticPlanningException {
         NodesMapping mapping;
 
         try {
@@ -135,7 +130,7 @@ public class Fragment implements RelSource {
             else if (local())
                 mapping = localMapping(ctx);
             else {
-                RelDistribution.Type type = IgniteMdDistribution._distribution(root, mq).getType();
+                RelDistribution.Type type = ((IgniteSender)root).sourceDistribution().getType();
 
                 boolean single = type == SINGLETON || type == BROADCAST_DISTRIBUTED;
 
@@ -144,7 +139,7 @@ public class Fragment implements RelSource {
             }
         }
         catch (LocationMappingException e) {
-            throw new OptimisticPlanningException("Failed to calculate physical distribution", new Edge(null, root, -1));
+            throw new OptimisticPlanningException("Failed to calculate physical distribution", root, e);
         }
 
         return mapping.deduplicate();
@@ -152,6 +147,6 @@ public class Fragment implements RelSource {
 
     /** */
     private NodesMapping localMapping(PlanningContext ctx) {
-        return new NodesMapping(Collections.singletonList(ctx.localNodeId()), null, (byte) (NodesMapping.CLIENT | NodesMapping.DEDUPLICATED));
+        return new NodesMapping(Collections.singletonList(ctx.localNodeId()), null, NodesMapping.CLIENT);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
new file mode 100644
index 0000000..92b0cbe
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+
+/**
+ *
+ */
+public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
+    /** */
+    private RelMetadataQueryEx mq;
+
+    /** */
+    private List<Fragment> fragments;
+
+    /** */
+    private RelNode cutPoint;
+
+    /** */
+    public List<Fragment> go(Fragment fragment, RelNode cutPoint, RelMetadataQueryEx mq) {
+        this.cutPoint = cutPoint;
+        this.mq = mq;
+
+        fragments = new ArrayList<>();
+
+        try {
+            fragments.add(new Fragment(visit(fragment.root())));
+
+            Collections.reverse(fragments);
+
+            return fragments;
+        }
+        finally {
+            this.cutPoint = null;
+            this.mq = null;
+
+            fragments = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSender rel) {
+        // a split may happen on BiRel inputs merge. A sender node cannot be a BiRel input.
+        assert rel != cutPoint;
+
+        RelNode input = rel.getInput();
+        RelNode newInput = visit((IgniteRel) input);
+
+        if (input == newInput)
+            return rel;
+
+        return (IgniteRel) rel.copy(rel.getTraitSet(), ImmutableList.of(newInput));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteFilter rel) {
+        boolean split = rel == cutPoint;
+
+        RelNode input = rel.getInput();
+        RelNode newInput = visit((IgniteRel) input);
+
+        if (input != newInput) {
+            RelTraitSet traits = rel.getTraitSet()
+                .replace(IgniteMdDistribution.filter(mq, newInput, rel.getCondition()));
+
+            rel = (IgniteFilter) rel.copy(traits, ImmutableList.of(newInput));
+        }
+
+        return split ? split(rel, rel.getTraitSet()) : rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteProject rel) {
+        boolean split = rel == cutPoint;
+
+        RelNode input = rel.getInput();
+        RelNode newInput = visit((IgniteRel) input);
+
+        if (input != newInput) {
+            RelTraitSet traits = rel.getTraitSet()
+                .replace(IgniteMdDistribution.project(mq, newInput, rel.getProjects()));
+
+            rel = (IgniteProject) rel.copy(traits, ImmutableList.of(newInput));
+        }
+
+        return split ? split(rel, rel.getTraitSet()) : rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteJoin rel) {
+        boolean split = rel == cutPoint;
+
+        RelNode left = rel.getLeft();
+        RelNode right = rel.getRight();
+
+        RelNode newLeft = visit((IgniteRel) left);
+        RelNode newRight = visit((IgniteRel) right);
+
+        // Join requires input distribution and produces its own one.
+        // It cannot change the distribution on a child node change.
+        if (left != newLeft || right != newRight)
+            rel = (IgniteJoin) rel.copy(rel.getTraitSet(), ImmutableList.of(newLeft, newRight));
+
+        return split ? split(rel, rel.getTraitSet()) : rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableModify rel) {
+        boolean split = rel == cutPoint;
+
+        RelNode input = rel.getInput();
+        RelNode newInput = visit((IgniteRel) input);
+
+        if (input != newInput)
+            rel = (IgniteTableModify) rel.copy(rel.getTraitSet(), ImmutableList.of(newInput));
+
+        return split ? split(rel, rel.getTraitSet()) : rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        return rel == cutPoint ? split(rel, rel.getTraitSet()) : rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReceiver rel) {
+        // a split may happen on BiRel inputs merge. A receiver doesn't have a
+        // physical mapping, so, its merge with any input cannot cause the split.
+        assert rel != cutPoint;
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteValues rel) {
+        // a split may happen on BiRel inputs merge. A values node doesn't have a
+        // physical mapping, so, its merge with any input cannot cause the split.
+        assert rel != cutPoint;
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteRel rel) {
+        return rel.accept(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteExchange rel) {
+        throw new AssertionError();
+    }
+
+    /** */
+    private IgniteRel split(IgniteRel input, RelTraitSet traits) {
+        RelOptCluster cluster = input.getCluster();
+
+        Fragment fragment = new Fragment(new IgniteSender(cluster, traits, input));
+
+        fragments.add(fragment);
+
+        return new IgniteReceiver(cluster, traits, input.getRowType(), fragment);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index b651406..397a9ef 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -118,9 +118,10 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
 
         typeFactory = ctx.typeFactory();
         catalogReader = ctx.catalogReader();
+        operatorTable = ctx.opTable();
+        conformance = ctx.conformance();
         frameworkConfig = ctx.config();
 
-        operatorTable = frameworkConfig.getOperatorTable();
         programs = frameworkConfig.getPrograms();
         parserConfig = frameworkConfig.getParserConfig();
         sqlToRelConverterConfig = frameworkConfig.getSqlToRelConverterConfig();
@@ -129,7 +130,6 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         traitDefs = frameworkConfig.getTraitDefs();
 
         rexBuilder = new RexBuilder(typeFactory);
-        conformance = ctx.connectionConfig().conformance();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
deleted file mode 100644
index 1f1cbf0..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-
-/**
- *
- */
-public interface RelSource {
-    /**
-     * @return ID of a fragment, where data comes from.
-     */
-    long fragmentId();
-
-    /**
-     * Returns source mapping. The mapping contains nodes where data comes from.
-     * It's used to determine that all sources sent all related data.
-     *
-     * @return Source mapping.
-     */
-    NodesMapping mapping();
-
-    /**
-     * Binds a source to target and starts source data location calculation.
-     * After this method call the source knows where to send data and the target knows where to expect data from.
-     * @param target Target.
-     * @param mappingService
-     * @param ctx Context.
-     * @param mq Metadata query instance.
-     */
-    default void bindToTarget(RelTarget target, MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
-        // No-op
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
index 1dcb6b4..904b468 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
@@ -17,16 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.io.Serializable;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  *
  */
-public interface RelTarget {
+public interface RelTarget extends Serializable {
     /**
      * @return Target fragment ID.
      */
@@ -39,9 +39,4 @@ public interface RelTarget {
      * @return Target mapping.
      */
     NodesMapping mapping();
-
-    /**
-     * @return Target distribution.
-     */
-    IgniteDistribution distribution();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetAware.java
similarity index 56%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetAware.java
index 8afbb01..a879b30 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetAware.java
@@ -17,35 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.io.Serializable;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-
 /**
- *
+ * Describes an object, that needs information about physical target distribution.
  */
-public class RelSourceImpl implements RelSource, Serializable {
-    /** */
-    private final long fragmentId;
-
-    /** */
-    private final NodesMapping mapping;
-
+public interface RelTargetAware {
     /**
-     * @param fragmentId Fragment ID.
-     * @param mapping Source mapping.
+     * @param target Remote target information.
      */
-    public RelSourceImpl(long fragmentId, NodesMapping mapping) {
-        this.fragmentId = fragmentId;
-        this.mapping = mapping;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long fragmentId() {
-        return fragmentId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public NodesMapping mapping() {
-        return mapping;
-    }
+    void target(RelTarget target);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
index 570b81f..1c27bde 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
@@ -17,32 +17,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.io.Serializable;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  *
  */
-public class RelTargetImpl implements RelTarget, Serializable {
+public class RelTargetImpl implements RelTarget {
     /** */
     private final long fragmentId;
 
     /** */
     private final NodesMapping mapping;
 
-    /** */
-    private final IgniteDistribution distribution;
-
     /**
      * @param fragmentId Target fragment ID.
      * @param mapping Target mapping.
-     * @param distribution Target distribution.
      */
-    public RelTargetImpl(long fragmentId, NodesMapping mapping, IgniteDistribution distribution) {
+    public RelTargetImpl(long fragmentId, NodesMapping mapping) {
         this.fragmentId = fragmentId;
         this.mapping = mapping;
-        this.distribution = distribution;
     }
 
     /** {@inheritDoc} */
@@ -54,9 +47,4 @@ public class RelTargetImpl implements RelTarget, Serializable {
     @Override public NodesMapping mapping() {
         return mapping;
     }
-
-    /** {@inheritDoc} */
-    @Override public IgniteDistribution distribution() {
-        return distribution;
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index b376c58..00cac1a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -108,14 +107,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
         RelOptCluster cluster = rel.getCluster();
         IgniteRel input = visit((IgniteRel) rel.getInput());
 
-        RelTraitSet inTraits = input.getTraitSet();
-        RelTraitSet outTraits = rel.getTraitSet();
-
-        Fragment fragment = new Fragment(new IgniteSender(cluster, inTraits, input));
+        Fragment fragment = new Fragment(new IgniteSender(cluster, rel.getTraitSet(), input));
 
         fragments.add(fragment);
 
-        return new IgniteReceiver(cluster, outTraits, input.getRowType(), fragment);
+        return new IgniteReceiver(cluster, rel.getTraitSet(), input.getRowType(), fragment);
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 78083a1..0e474e3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
  */
 public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
     /** */
-    private final RelSource source;
+    private final Fragment source;
 
     /**
      * Creates a Receiver
@@ -42,9 +42,9 @@ public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
      * @param cluster  Cluster that this relational expression belongs to
      * @param traits   Traits of this relational expression
      * @param rowType  Output row type
-     * @param source   Remote sources information.
+     * @param source   Source fragment.
      */
-    public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, RelSource source) {
+    public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Fragment source) {
         super(cluster, traits);
 
         this.rowType = rowType;
@@ -62,9 +62,9 @@ public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
     }
 
     /**
-     * @return Remote sources information.
+     * @return Source fragment.
      */
-    public RelSource source() {
+    public Fragment source() {
         return source;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index e89bece..403f34b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -20,15 +20,20 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTargetAware;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  * Relational expression that iterates over its input
  * and sends elements to remote {@link IgniteReceiver}
  */
-public class IgniteSender extends SingleRel implements IgniteRel {
+public class IgniteSender extends SingleRel implements IgniteRel, RelTargetAware {
     /** */
     private RelTarget target;
 
@@ -67,6 +72,11 @@ public class IgniteSender extends SingleRel implements IgniteRel {
         return visitor.visit(this);
     }
 
+    /** {@inheritDoc} */
+    @Override public void target(RelTarget target) {
+        this.target = target;
+    }
+
     /**
      * @return Remote targets information.
      */
@@ -75,9 +85,23 @@ public class IgniteSender extends SingleRel implements IgniteRel {
     }
 
     /**
-     * @param target Remote targets information.
+     * @return Node distribution.
      */
-    public void target(RelTarget target) {
-        this.target = target;
+    public IgniteDistribution targetDistribution() {
+        return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+    }
+
+    /**
+     * @return Node distribution.
+     */
+    public IgniteDistribution sourceDistribution() {
+        return input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+    }
+
+    /**
+     * @return Node collations.
+     */
+    public List<RelCollation> collations() {
+        return getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
index 55a6e24..9a79bbc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
@@ -56,7 +56,7 @@ public class JoinConverter extends IgniteConverter {
         RelMetadataQuery mq = cluster.getMetadataQuery();
 
         List<IgniteDistribution> leftTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
-        List<IgniteDistribution> rightTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
+        List<IgniteDistribution> rightTraits = IgniteMdDerivedDistribution.deriveDistributions(right, IgniteConvention.INSTANCE, mq);
 
         List<IgniteDistributions.BiSuggestion> suggestions = IgniteDistributions.suggestJoin(leftTraits, rightTraits, join.analyzeCondition(), join.getJoinType());
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
index c11b793..04a32af 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
@@ -64,12 +64,12 @@ public class RelToPhysicalConverter implements IgniteRelVisitor<PhysicalRel> {
 
     /** {@inheritDoc} */
     @Override public PhysicalRel visit(IgniteSender rel) {
-        long targetFragmentId = rel.target().fragmentId();
-        NodesMapping targetMapping = rel.target().mapping();
-        DistributionFunction targetFunction = rel.target().distribution().function();
-        ImmutableIntList distributionKeys = rel.target().distribution().getKeys();
+        long fragmentId = rel.target().fragmentId();
+        NodesMapping mapping = rel.target().mapping();
+        DistributionFunction fun = rel.targetDistribution().function();
+        ImmutableIntList keys = rel.targetDistribution().getKeys();
 
-        return new SenderPhysicalRel(targetFragmentId, targetMapping, targetFunction, distributionKeys, visit((IgniteRel) rel.getInput()));
+        return new SenderPhysicalRel(fragmentId, mapping, fun, keys, visit((IgniteRel) rel.getInput()));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index 5ee9192..085ce43 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -59,6 +59,18 @@ public abstract class DistributionFunction implements Serializable {
     public abstract Destination destination(PartitionService partitionService, NodesMapping mapping, ImmutableIntList keys);
 
     /**
+     * Creates a partition.
+     *
+     * @param partitionService Affinity function source.
+     * @param partitionsCount Expected partitions count.
+     * @param keys Distribution keys.
+     * @return Partition function.
+     */
+    public ToIntFunction<Object> partitionFunction(PartitionService partitionService, int partitionsCount, ImmutableIntList keys) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * @return Function name. This name used for equality checking and in {@link RelNode#getDigest()}.
      */
     public final String name(){
@@ -208,10 +220,12 @@ public abstract class DistributionFunction implements Serializable {
                     assert F.isEmpty(assignment) || assignment.size() == 1;
             }
 
-            ToIntFunction<Object> rowToPart = DistributionFunction.rowToPart(
-                partitionService.partitionFunction(CU.UNDEFINED_CACHE_ID), k.toIntArray());
+            return new Partitioned(m.nodes(), assignments, partitionFunction(partitionService, assignments.size(), k));
+        }
 
-            return new Partitioned(m.nodes(), assignments, rowToPart);
+        /** {@inheritDoc} */
+        @Override public ToIntFunction<Object> partitionFunction(PartitionService partitionService, int partitionsCount, ImmutableIntList k) {
+            return DistributionFunction.rowToPart(partitionService.partitionFunction(CU.UNDEFINED_CACHE_ID), partitionsCount, k.toIntArray());
         }
 
         /** */
@@ -253,10 +267,12 @@ public abstract class DistributionFunction implements Serializable {
                     assert F.isEmpty(assignment) || assignment.size() == 1;
             }
 
-            ToIntFunction<Object> rowToPart = DistributionFunction.rowToPart(
-                partitionService.partitionFunction(cacheId), k.toIntArray());
+            return new Partitioned(m.nodes(), assignments, partitionFunction(partitionService, assignments.size(), k));
+        }
 
-            return new Partitioned(m.nodes(), assignments, rowToPart);
+        /** {@inheritDoc} */
+        @Override public ToIntFunction<Object> partitionFunction(PartitionService partitionService, int partitionsCount, ImmutableIntList k) {
+            return DistributionFunction.rowToPart(partitionService.partitionFunction(cacheId), partitionsCount, k.toIntArray());
         }
 
         /** {@inheritDoc} */
@@ -266,7 +282,7 @@ public abstract class DistributionFunction implements Serializable {
     }
 
     /** */
-    private static ToIntFunction<Object> rowToPart(ToIntFunction<Object> keyToPart, int[] keys) {
+    private static ToIntFunction<Object> rowToPart(ToIntFunction<Object> keyToPart, int parts, int[] keys) {
         return r -> {
             Object[] row = (Object[]) r;
 
@@ -278,7 +294,7 @@ public abstract class DistributionFunction implements Serializable {
             for (int i = 1; i < keys.length; i++)
                 hash = 31 * hash + keyToPart.applyAsInt(row[keys[i]]);
 
-            return hash;
+            return U.safeAbs(hash) % parts;
         };
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index e33de68..b97794b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -20,10 +20,8 @@ package org.apache.ignite.internal.processors.query.calcite.trait;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Exchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 
 /**
@@ -33,14 +31,17 @@ public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
     /** */
     public static final DistributionTraitDef INSTANCE = new DistributionTraitDef();
 
+    /** {@inheritDoc} */
     @Override public Class<IgniteDistribution> getTraitClass() {
         return IgniteDistribution.class;
     }
 
+    /** {@inheritDoc} */
     @Override public String getSimpleName() {
         return "distr";
     }
 
+    /** {@inheritDoc} */
     @Override public RelNode convert(RelOptPlanner planner, RelNode rel, IgniteDistribution targetDist, boolean allowInfiniteCostConverters) {
         if (rel.getConvention() == Convention.NONE)
             return null;
@@ -54,14 +55,8 @@ public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
             case HASH_DISTRIBUTED:
             case BROADCAST_DISTRIBUTED:
             case SINGLETON:
-                Exchange exchange = new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel, targetDist);
-                RelNode newRel = planner.register(exchange, rel);
-                RelTraitSet newTraits = rel.getTraitSet().replace(targetDist);
-
-                if (!newRel.getTraitSet().equals(newTraits))
-                    newRel = planner.changeTraits(newRel, newTraits);
-
-                return newRel;
+                return register(planner, rel,
+                    new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel, targetDist));
             case ANY:
                 return rel;
             default:
@@ -69,11 +64,23 @@ public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
         }
     }
 
+    /** {@inheritDoc} */
     @Override public boolean canConvert(RelOptPlanner planner, IgniteDistribution fromTrait, IgniteDistribution toTrait) {
         return true;
     }
 
+    /** {@inheritDoc} */
     @Override public IgniteDistribution getDefault() {
         return IgniteDistributions.any();
     }
+
+    /** */
+    private RelNode register(RelOptPlanner planner, RelNode rel, RelNode replace) {
+        RelNode registered = planner.register(replace, rel);
+
+        if (!registered.getTraitSet().equals(replace.getTraitSet()))
+            registered = planner.changeTraits(registered, replace.getTraitSet());
+
+        return registered;
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 5855442..4fe2777 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -218,10 +219,10 @@ public class IgniteDistributions {
         if (joinType == LEFT || joinType == RIGHT || (joinType == INNER && !F.isEmpty(joinInfo.pairs()))) {
             HashSet<DistributionFunction> factories = U.newHashSet(3);
 
-            if (leftIn.getKeys().equals(joinInfo.leftKeys))
+            if (Objects.equals(joinInfo.leftKeys, leftIn.getKeys()))
                 factories.add(leftIn.function());
 
-            if (rightIn.getKeys().equals(joinInfo.rightKeys))
+            if (Objects.equals(joinInfo.rightKeys, rightIn.getKeys()))
                 factories.add(rightIn.function());
 
             factories.add(HashDistribution.INSTANCE);
@@ -258,10 +259,10 @@ public class IgniteDistributions {
         IgniteDistribution newLeft, IgniteDistribution newRight) {
         int exch = 0;
 
-        if (!left.satisfies(newLeft))
+        if (needsExchange(left, newLeft))
             exch++;
 
-        if (!right.satisfies(newRight))
+        if (needsExchange(right, newRight))
             exch++;
 
         dst.add(new BiSuggestion(out, newLeft, newRight, exch));
@@ -270,6 +271,11 @@ public class IgniteDistributions {
     }
 
     /** */
+    private static boolean needsExchange(IgniteDistribution sourceDist, IgniteDistribution targetDist) {
+        return !sourceDist.satisfies(targetDist);
+    }
+
+    /** */
     @SuppressWarnings("SameParameterValue")
     private static List<BiSuggestion> topN(ArrayList<BiSuggestion> src, int n) {
         Collections.sort(src);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java
index cd806ba..3fd0aed 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/Partitioned.java
@@ -41,7 +41,7 @@ public final class Partitioned implements Destination {
 
     /** {@inheritDoc} */
     @Override public List<UUID> targets(Object row) {
-        return assignments.get(partFun.applyAsInt(row) % assignments.size());
+        return assignments.get(partFun.applyAsInt(row));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index c6b07e5..01c63b6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.linq4j.tree.Primitive;
@@ -32,6 +33,7 @@ import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
@@ -65,18 +67,22 @@ public final class Commons {
     public static <T> List<T> intersect(List<T> left, List<T> right) {
         if (F.isEmpty(left) || F.isEmpty(right))
             return Collections.emptyList();
-        else if (left.size() > right.size())
-            return intersect0(right, left);
-        else
-            return intersect0(left, right);
+
+        return intersect0(left, right);
     }
 
-    /** */
-    private static <T> List<T> intersect0(List<T> left, List<T> right) {
-        List<T> res = new ArrayList<>(Math.min(left.size(), right.size()));
-        HashSet<T> set = new HashSet<>(left);
+    /**
+     * Intersects a set and a list.
+     *
+     * @return A List of unique entries that presented in both the given set and the given list.
+     */
+    public static <T> List<T> intersect(Set<T> set, List<T> list) {
+        if (F.isEmpty(set) || F.isEmpty(list))
+            return Collections.emptyList();
 
-        for (T t : right) {
+        List<T> res = new ArrayList<>(Math.min(set.size(), list.size()));
+
+        for (T t : list) {
             if (set.contains(t))
                 res.add(t);
         }
@@ -84,6 +90,14 @@ public final class Commons {
         return res;
     }
 
+    /** */
+    private static <T> List<T> intersect0(@NotNull List<T> left, @NotNull List<T> right) {
+        if (left.size() > right.size())
+            return intersect0(right, left);
+
+        return intersect(new HashSet<>(left), right);
+    }
+
     /**
      * Returns a given list as a typed list.
      */
@@ -201,10 +215,21 @@ public final class Commons {
 
     /** */
     public static RelDataType combinedRowType(IgniteTypeFactory typeFactory, RelDataType... types) {
-        final RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+        RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
+
+        Set<String> names = new HashSet<>();
 
-        for (RelDataType type : types)
-            builder.addAll(type.getFieldList());
+        for (RelDataType type : types) {
+            for (RelDataTypeField field : type.getFieldList()) {
+                int idx = 0;
+                String fieldName = field.getName();
+
+                while (!names.add(fieldName))
+                    fieldName = field.getName() + idx++;
+
+                builder.add(fieldName, field.getType());
+            }
+        }
 
         return builder.build();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
index 118b5c0..2fbf47a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
@@ -195,7 +195,7 @@ public class TableScan implements Iterable<Object[]> {
         private void reserveReplicated(GridDhtPartitionTopology top) {
             List<GridDhtLocalPartition> localParts = top.localPartitions();
 
-            parts = new ArrayDeque<>(localParts);
+            parts = new ArrayDeque<>(localParts.size());
 
             for (GridDhtLocalPartition local : localParts) {
                 if (!local.reserve())
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index c54eec9..5af47fb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -93,6 +94,44 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     }
 
     @Test
+    public void query2() throws Exception {
+        IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
+            .setName("developer")
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, Developer.class)
+            .setCacheMode(CacheMode.REPLICATED)
+        );
+
+        IgniteCache<Integer, Project> project = ignite.getOrCreateCache(new CacheConfiguration<Integer, Project>()
+            .setName("project")
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, Project.class)
+            .setBackups(2)
+        );
+
+        waitForReadyTopology(internalCache(project).context().topology(), new AffinityTopologyVersion(5, 3));
+
+        project.putAll(ImmutableMap.of(
+            0, new Project("Ignite"),
+            1, new Project("Calcite")
+        ));
+
+        developer.putAll(ImmutableMap.of(
+            0, new Developer("Igor", 1),
+            1, new Developer("Roman", 0)
+        ));
+
+        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+            "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?", 0);
+
+        assertEquals(1, query.size());
+
+        assertEqualsCollections(Arrays.asList("Igor", 1, "Calcite"), F.first(query.get(0).getAll()));
+    }
+
+    @Test
     public void queryMultiStatement() throws Exception {
         IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
             .setName("developer")