You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/19 08:40:57 UTC

[ignite] branch sql-calcite updated: IGNITE-14281 Calcite. Colocated tables are considered non colocated

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new faf2c5e  IGNITE-14281 Calcite. Colocated tables are considered non colocated
faf2c5e is described below

commit faf2c5e319e2beccb5f0000d38476d1f59a4a646
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Fri Mar 19 11:40:36 2021 +0300

    IGNITE-14281 Calcite. Colocated tables are considered non colocated
---
 .../query/calcite/prepare/PlannerPhase.java        |   5 +-
 .../query/calcite/rel/AbstractIgniteJoin.java      | 124 +++++----
 .../query/calcite/rel/IgniteAggregateBase.java     |   2 +-
 .../query/calcite/rel/IgniteMergeJoin.java         |  32 +--
 .../calcite/rule/logical/FilterScanMergeRule.java  |   9 +-
 .../query/calcite/schema/TableDescriptorImpl.java  |  37 ++-
 .../query/calcite/trait/DistributionFunction.java  |   7 +-
 .../query/calcite/trait/DistributionTrait.java     |  35 +--
 .../query/calcite/trait/IgniteDistributions.java   |  10 +
 .../processors/query/calcite/trait/TraitUtils.java |  30 ++-
 .../calcite/AbstractBasicIntegrationTest.java      |   2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  60 +++++
 .../processors/query/calcite/QueryChecker.java     |   1 +
 .../query/calcite/planner/AbstractPlannerTest.java |  14 +
 .../calcite/planner/JoinColocationPlannerTest.java | 288 +++++++++++++++++++++
 .../apache/ignite/testsuites/PlannerTestSuite.java |   2 +
 .../processors/query/GridQueryProcessor.java       |   2 +-
 .../processors/query/h2/opt/GridH2Table.java       |  15 --
 18 files changed, 514 insertions(+), 161 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 58d8667..6f21db7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -155,15 +155,16 @@ public enum PlannerPhase {
                     FilterScanMergeRule.INDEX_SCAN,
 
                     LogicalOrToUnionRule.INSTANCE,
+
                     CorrelatedNestedLoopJoinRule.INSTANCE,
+                    NestedLoopJoinConverterRule.INSTANCE,
+                    MergeJoinConverterRule.INSTANCE,
 
                     ValuesConverterRule.INSTANCE,
                     LogicalScanConverterRule.INDEX_SCAN,
                     LogicalScanConverterRule.TABLE_SCAN,
                     HashAggregateConverterRule.INSTANCE,
                     SortAggregateConverterRule.INSTANCE,
-                    MergeJoinConverterRule.INSTANCE,
-                    NestedLoopJoinConverterRule.INSTANCE,
                     ProjectConverterRule.INSTANCE,
                     FilterConverterRule.INSTANCE,
                     TableModifyConverterRule.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
index cb54815..956ae76 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteJoin.java
@@ -18,9 +18,10 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
@@ -38,8 +39,10 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -50,12 +53,11 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.rel.core.JoinRelType.INNER;
-import static org.apache.calcite.rel.core.JoinRelType.LEFT;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 import static org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdRowCount.joinRowCount;
 import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.broadcast;
 import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.hash;
+import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.random;
 import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 
 /** */
@@ -110,27 +112,23 @@ public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgni
         RewindabilityTrait leftRewindability = TraitUtils.rewindability(left);
         RewindabilityTrait rightRewindability = TraitUtils.rewindability(right);
 
-        RelTraitSet outTraits, leftTraits, rightTraits;
+        List<Pair<RelTraitSet, List<RelTraitSet>>> pairs = new ArrayList<>();
 
-        if (leftRewindability.rewindable() && rightRewindability.rewindable()) {
-            outTraits = nodeTraits.replace(RewindabilityTrait.REWINDABLE);
-            leftTraits = left.replace(RewindabilityTrait.REWINDABLE);
-            rightTraits = right.replace(RewindabilityTrait.REWINDABLE);
-        }
-        else {
-            outTraits = nodeTraits.replace(RewindabilityTrait.ONE_WAY);
-            leftTraits = left.replace(RewindabilityTrait.ONE_WAY);
-            rightTraits = right.replace(RewindabilityTrait.ONE_WAY);
-        }
+        pairs.add(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            ImmutableList.of(left.replace(RewindabilityTrait.ONE_WAY), right.replace(RewindabilityTrait.ONE_WAY))));
 
-        return ImmutableList.of(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+        if (leftRewindability.rewindable() && rightRewindability.rewindable())
+            pairs.add(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE),
+                ImmutableList.of(left.replace(RewindabilityTrait.REWINDABLE), right.replace(RewindabilityTrait.REWINDABLE))));
+
+        return ImmutableList.copyOf(pairs);
     }
 
     /** {@inheritDoc} */
     @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // Tere are several rules:
+        // There are several rules:
         // 1) any join is possible on broadcast or single distribution
-        // 2) hash distributed join is possible when join keys equal to source distribution keys
+        // 2) hash distributed join is possible when join keys are superset of source distribution keys
         // 3) hash and broadcast distributed tables can be joined when join keys equal to hash
         //    distributed table distribution keys and:
         //      3.1) it's a left join and a hash distributed table is at left
@@ -144,71 +142,55 @@ public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgni
         IgniteDistribution leftDistr = TraitUtils.distribution(left);
         IgniteDistribution rightDistr = TraitUtils.distribution(right);
 
-        RelTraitSet outTraits, leftTraits, rightTraits;
+        IgniteDistribution left2rightProjectedDistr = leftDistr.apply(buildProjectionMapping(true));
+        IgniteDistribution right2leftProjectedDistr = rightDistr.apply(buildProjectionMapping(false));
+
+        RelTraitSet outTraits;
+        RelTraitSet leftTraits;
+        RelTraitSet rightTraits;
 
-        if (leftDistr == broadcast() || rightDistr == broadcast()) {
+        if (leftDistr == broadcast() && rightDistr == broadcast()) {
             outTraits = nodeTraits.replace(broadcast());
             leftTraits = left.replace(broadcast());
             rightTraits = right.replace(broadcast());
-
-            res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
         }
-
-        if (leftDistr == single() || rightDistr == single()) {
+        else {
             outTraits = nodeTraits.replace(single());
             leftTraits = left.replace(single());
             rightTraits = right.replace(single());
-
-            res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
         }
 
-        if (!F.isEmpty(joinInfo.pairs())) {
-            Set<DistributionFunction> functions = new HashSet<>();
+        res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
 
-            if (leftDistr.getType() == HASH_DISTRIBUTED
-                && Objects.equals(joinInfo.leftKeys, leftDistr.getKeys()))
-                functions.add(leftDistr.function());
+        if (F.isEmpty(joinInfo.pairs()))
+            return ImmutableList.copyOf(res);
 
-            if (rightDistr.getType() == HASH_DISTRIBUTED
-                && Objects.equals(joinInfo.rightKeys, rightDistr.getKeys()))
-                functions.add(rightDistr.function());
+        if (leftDistr.getType() == HASH_DISTRIBUTED && left2rightProjectedDistr != random()) {
+            outTraits = nodeTraits.replace(leftDistr);
+            leftTraits = left.replace(leftDistr);
+            rightTraits = right.replace(left2rightProjectedDistr);
 
-            functions.add(DistributionFunction.hash());
-
-            for (DistributionFunction function : functions) {
-                leftTraits = left.replace(hash(joinInfo.leftKeys, function));
-                rightTraits = right.replace(hash(joinInfo.rightKeys, function));
+            res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+        }
 
-                // TODO distribution multitrait support
-                outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, function));
-                res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+        if (rightDistr.getType() == HASH_DISTRIBUTED && right2leftProjectedDistr != random()) {
+            outTraits = nodeTraits.replace(rightDistr);
+            leftTraits = left.replace(right2leftProjectedDistr);
+            rightTraits = right.replace(rightDistr);
 
-                outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, function));
-                res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+            res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
+        }
 
-                if (joinType == INNER || joinType == LEFT) {
-                    outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, function));
-                    leftTraits = left.replace(hash(joinInfo.leftKeys, function));
-                    rightTraits = right.replace(broadcast());
+        leftTraits = left.replace(hash(joinInfo.leftKeys, DistributionFunction.hash()));
+        rightTraits = right.replace(hash(joinInfo.rightKeys, DistributionFunction.hash()));
 
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-                }
+        outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, DistributionFunction.hash()));
+        res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
 
-                if (joinType == INNER || joinType == RIGHT) {
-                    outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, function));
-                    leftTraits = left.replace(broadcast());
-                    rightTraits = right.replace(hash(joinInfo.rightKeys, function));
+        outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, DistributionFunction.hash()));
+        res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
 
-                    res.add(Pair.of(outTraits, ImmutableList.of(leftTraits, rightTraits)));
-                }
-            }
-        }
-
-        if (!res.isEmpty())
-            return res;
-
-        return ImmutableList.of(Pair.of(nodeTraits.replace(single()),
-            ImmutableList.of(left.replace(single()), right.replace(single()))));
+        return ImmutableList.copyOf(res);
     }
 
     /** {@inheritDoc} */
@@ -316,4 +298,20 @@ public abstract class AbstractIgniteJoin extends Join implements TraitsAwareIgni
         }
         return true;
     }
+
+    /** Creates mapping from left join keys to the right and vice versa with regards to {@code left2Right}. */
+    protected Mappings.TargetMapping buildProjectionMapping(boolean left2Right) {
+        ImmutableIntList sourceKeys = left2Right ? joinInfo.leftKeys : joinInfo.rightKeys;
+        ImmutableIntList targetKeys = left2Right ? joinInfo.rightKeys : joinInfo.leftKeys;
+
+        Map<Integer, Integer> keyMap = new HashMap<>();
+        for (int i = 0; i < joinInfo.leftKeys.size(); i++)
+            keyMap.put(sourceKeys.get(i), targetKeys.get(i));
+
+        return Mappings.target(
+            keyMap,
+            (left2Right ? left : right).getRowType().getFieldCount(),
+            (left2Right ? right : left).getRowType().getFieldCount()
+        );
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
index d2a40a4..32ef760 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregateBase.java
@@ -183,7 +183,7 @@ public abstract class IgniteAggregateBase extends IgniteAggregate implements Tra
 
                     //Check that group by contains all key columns
                     if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = Commons.inverseMapping(
+                        Mappings.TargetMapping mapping = Commons.mapping(
                             groupSet, getInput().getRowType().getFieldCount());
 
                         IgniteDistribution outDistr = distribution.apply(mapping);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index ecddf42..38b6811 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -31,6 +31,7 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
@@ -137,36 +138,17 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
 
         List<Integer> newLeftCollation, newRightCollation;
 
-        if (isPrefix(leftCollation.getKeys(), joinInfo.leftKeys)) { // preserve left collation
-            newLeftCollation = new ArrayList<>(leftCollation.getKeys());
+        if (isPrefix(leftCollation.getKeys(), joinInfo.leftKeys)) // preserve left collation
+            rightCollation = leftCollation.apply(buildProjectionMapping(true));
 
-            Map<Integer, Integer> leftToRight = joinInfo.pairs().stream()
-                .collect(Collectors.toMap(p -> p.source, p -> p.target));
+        else if (isPrefix(rightCollation.getKeys(), joinInfo.rightKeys))// preserve right collation
+            leftCollation = rightCollation.apply(buildProjectionMapping(false));
 
-            newRightCollation = newLeftCollation.stream()
-                .limit(joinInfo.pairs().size())
-                .map(leftToRight::get)
-                .collect(Collectors.toList());
-        }
-        else if (isPrefix(rightCollation.getKeys(), joinInfo.rightKeys)) { // preserve right collation
-            newRightCollation = new ArrayList<>(rightCollation.getKeys());
-
-            Map<Integer, Integer> rightToLeft = joinInfo.pairs().stream()
-                .collect(Collectors.toMap(p -> p.target, p -> p.source));
-
-            newLeftCollation = newRightCollation.stream()
-                .limit(joinInfo.pairs().size())
-                .map(rightToLeft::get)
-                .collect(Collectors.toList());
-        }
         else { // generate new collations
-            newLeftCollation = new ArrayList<>(joinInfo.leftKeys);
-            newRightCollation = new ArrayList<>(joinInfo.rightKeys);
+            leftCollation = RelCollations.of(joinInfo.leftKeys);
+            rightCollation = RelCollations.of(joinInfo.rightKeys);
         }
 
-        leftCollation = createCollation(newLeftCollation);
-        rightCollation = createCollation(newRightCollation);
-
         return ImmutableList.of(
             Pair.of(
                 nodeTraits.replace(leftCollation),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
index d3f033c..284d92d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
@@ -103,12 +103,6 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
     }
 
     /** {@inheritDoc} */
-    @Override public boolean matches(RelOptRuleCall call) {
-        T rel = call.rel(1);
-        return rel.condition() == null;
-    }
-
-    /** {@inheritDoc} */
     @Override public void onMatch(RelOptRuleCall call) {
         LogicalFilter filter = call.rel(0);
         T scan = call.rel(1);
@@ -119,6 +113,9 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
         RexNode condition = filter.getCondition();
         RexNode remaining = null;
 
+        if (scan.condition() != null)
+            condition = RexUtil.composeConjunction(builder, F.asList(scan.condition(), condition));
+
         if (scan.projects() != null) {
             IgniteTypeFactory typeFactory = Commons.typeFactory(scan);
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index c14a488..2d3881c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
@@ -36,6 +38,7 @@ import org.apache.calcite.schema.ColumnStrategy;
 import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -101,7 +104,7 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
     private final int valField;
 
     /** */
-    private final int affField;
+    private final ImmutableIntList affFields;
 
     /** */
     private final ImmutableBitSet insertFields;
@@ -129,16 +132,9 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
 
         int keyField = QueryUtils.KEY_COL;
         int valField = QueryUtils.VAL_COL;
-        int affField = QueryUtils.KEY_COL;
 
         for (String field : fields) {
-            if (Objects.equals(field, typeDesc.affinityKey()))
-                affField = descriptors.size();
-
             if (Objects.equals(field, typeDesc.keyFieldAlias())) {
-                if (typeDesc.affinityKey() == null)
-                    affField = descriptors.size();
-
                 keyField = descriptors.size();
 
                 virtualFields.set(0);
@@ -165,12 +161,27 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
         for (ColumnDescriptor descriptor : descriptors)
             descriptorsMap.put(descriptor.name(), descriptor);
 
-        if (TypeUtils.isConvertableType(descriptors.get(affField).storageType()))
-            affField = -1;
+        List<Integer> affFields = new ArrayList<>();
+        if (!F.isEmpty(typeDesc.affinityKey()))
+            affFields.add(descriptorsMap.get(typeDesc.affinityKey()).fieldIndex());
+        else if (!F.isEmpty(typeDesc.keyFieldAlias()))
+            affFields.add(descriptorsMap.get(typeDesc.keyFieldAlias()).fieldIndex());
+        else {
+            affFields.addAll(
+                descriptors.stream()
+                    .filter(desc -> typeDesc.primaryKeyFields().contains(desc.name()))
+                    .map(ColumnDescriptor::fieldIndex)
+                    .collect(Collectors.toList())
+            );
+        }
+
+        if (affFields.stream().map(descriptors::get).map(ColumnDescriptor::storageType)
+            .anyMatch(TypeUtils::isConvertableType))
+            affFields.clear();
 
         this.keyField = keyField;
         this.valField = valField;
-        this.affField = affField;
+        this.affFields = ImmutableIntList.copyOf(affFields);
         this.descriptors = descriptors.toArray(DUMMY);
         this.descriptorsMap = descriptorsMap;
 
@@ -197,10 +208,10 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
     @Override public IgniteDistribution distribution() {
         if (affinityIdentity == null)
             return IgniteDistributions.broadcast();
-        else if (affField == -1)
+        else if (affFields.isEmpty())
             return IgniteDistributions.random();
         else
-            return IgniteDistributions.affinity(affField, cacheInfo.cacheId(), affinityIdentity);
+            return IgniteDistributions.affinity(affFields, cacheInfo.cacheId(), affinityIdentity);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index 6d79018..b1724a2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -141,11 +141,8 @@ public abstract class DistributionFunction {
         if (f0 == f1 || f0.name() == f1.name())
             return true;
 
-        if (f0 instanceof AffinityDistribution && f1 instanceof AffinityDistribution &&
-            Objects.equals(((AffinityDistribution)f0).identity(), ((AffinityDistribution)f1).identity()))
-            return true;
-
-        return false;
+        return f0 instanceof AffinityDistribution && f1 instanceof AffinityDistribution &&
+            Objects.equals(((AffinityDistribution)f0).identity(), ((AffinityDistribution)f1).identity());
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index fb5a9ee..7c1f4a0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.List;
 import java.util.Objects;
 import com.google.common.collect.Ordering;
 import org.apache.calcite.plan.RelMultipleTrait;
@@ -24,6 +25,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mapping;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
@@ -150,35 +152,14 @@ public final class DistributionTrait implements IgniteDistribution {
         if (getType() != HASH_DISTRIBUTED)
             return this;
 
-        if (mapping.getTargetCount() < keys.size())
-            return IgniteDistributions.random();
-
-        int[] map = new int[mapping.getSourceCount()];
-        int[] res = new int[keys.size()];
-
-        for (int i = 0; i < keys.size(); i++)
-            map[keys.getInt(i)] = i + 1;
-
-        for (int i = 0, found = 0; i < mapping.getTargetCount(); i++) {
-            int source = mapping.getSourceOpt(i);
-
-            if (source == -1)
-                continue;
-
-            int keyPos = map[source] - 1;
-
-            if (keyPos == -1)
-                continue;
-
-            res[keyPos] = i;
-
-            if (++found == keys.size())
-                return IgniteDistributions.hash(ImmutableIntList.of(res), function);
-
-            map[source] = 0;
+        for (int key : keys) {
+            if (mapping.getTargetOpt(key) == -1)
+                return IgniteDistributions.random(); // Some distribution keys are not mapped => any.
         }
 
-        return IgniteDistributions.random();
+        List<Integer> res = Mappings.apply2((Mapping) mapping, keys);
+
+        return IgniteDistributions.hash(ImmutableIntList.copyOf(res), function);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 0690124..8af167b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -87,6 +87,16 @@ public class IgniteDistributions {
     }
 
     /**
+     * @param keys Affinity keys.
+     * @param cacheId Affinity cache ID.
+     * @param identity Affinity identity key.
+     * @return Affinity distribution.
+     */
+    public static IgniteDistribution affinity(ImmutableIntList keys, int cacheId, Object identity) {
+        return hash(keys, DistributionFunction.affinity(cacheId, identity));
+    }
+
+    /**
      * @param keys Distribution keys.
      * @return Hash distribution.
      */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 602cb78..6cd77f1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.processors.query.calcite.trait;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
@@ -43,6 +46,7 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ControlFlowException;
@@ -64,7 +68,6 @@ import static java.util.Collections.singletonList;
 import static org.apache.calcite.plan.RelOptUtil.permutationPushDownProject;
 import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.calcite.rel.core.Project.getPartialMapping;
 import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.any;
 import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions.single;
 
@@ -142,6 +145,11 @@ public class TraitUtils {
         if (fromTrait.satisfies(toTrait))
             return rel;
 
+        // right now we cannot create a multi-column affinity
+        // key object, thus this conversion is impossible
+        if (toTrait.function().affinity() && toTrait.getKeys().size() > 1)
+            return null;
+
         RelTraitSet traits = rel.getTraitSet().replace(toTrait);
         if (fromTrait.getType() == BROADCAST_DISTRIBUTED && toTrait.getType() == HASH_DISTRIBUTED)
             return new IgniteTrimExchange(rel.getCluster(), traits, rel, toTrait);
@@ -365,7 +373,7 @@ public class TraitUtils {
         if (distribution.getType() != HASH_DISTRIBUTED)
             return distribution;
 
-        Mappings.TargetMapping mapping = getPartialMapping(inputRowType.getFieldCount(), projects);
+        Mappings.TargetMapping mapping = createProjectionMapping(inputRowType.getFieldCount(), projects);
 
         return distribution.apply(mapping);
     }
@@ -489,6 +497,24 @@ public class TraitUtils {
         );
     }
 
+    /**
+     * Creates mapping from provided projects that maps a source column idx
+     * to idx in a row after applying projections.
+     *
+     * @param inputFieldCount Size of a source row.
+     * @param projects Projections.
+     */
+    private static Mappings.TargetMapping createProjectionMapping(int inputFieldCount, List<? extends RexNode> projects) {
+        Map<Integer, Integer> src2target = new HashMap<>();
+
+        for (Ord<RexNode> exp : Ord.<RexNode>zip(projects)) {
+            if (exp.e instanceof RexInputRef)
+                src2target.putIfAbsent(((RexInputRef) exp.e).getIndex(), exp.i);
+        }
+
+        return Mappings.target(src -> src2target.getOrDefault(src, -1), inputFieldCount, projects.size());
+    }
+
     /** */
     private static class PropagationContext {
         /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
index ceec400..29f7786 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
@@ -42,7 +42,7 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         startGrids(3);
 
-        client = startClientGrid();
+        client = startClientGrid("client");
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index ac1b6ca0..c1a1062 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
@@ -38,9 +40,14 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -789,6 +796,59 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         assertNull(row);
     }
 
+    /**
+     * Test verifies that table has a distribution function over valid keys.
+     */
+    @Test
+    public void testTableDistributionKeysForComplexKeyObject() {
+        {
+            class MyKey {
+                int id1;
+
+                int id2;
+            }
+
+            LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+            fields.put("ID1", Integer.class.getName());
+            fields.put("ID2", Integer.class.getName());
+            fields.put("VAL", String.class.getName());
+
+            client.getOrCreateCache(new CacheConfiguration<MyKey, String>("test_cache_1")
+                .setSqlSchema("PUBLIC")
+                .setQueryEntities(F.asList(
+                    new QueryEntity(MyKey.class.getName(), String.class.getName())
+                        .setTableName("MY_TBL_1")
+                        .setFields(fields)
+                        .setKeyFields(ImmutableSet.of("ID1", "ID2"))
+                        .setValueFieldName("VAL")
+                ))
+                .setBackups(2)
+            );
+        }
+
+        {
+            client.getOrCreateCache(new CacheConfiguration<Key, String>("test_cache_2")
+                .setSqlSchema("PUBLIC")
+                .setQueryEntities(F.asList(
+                    new QueryEntity(Key.class, String.class)
+                        .setTableName("MY_TBL_2")
+                ))
+                .setBackups(2)
+            );
+        }
+
+        CalciteQueryProcessor qryProc = Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
+
+        Map<String, IgniteSchema> schemas = GridTestUtils.getFieldValue(qryProc, "schemaHolder", "igniteSchemas");
+
+        IgniteSchema pub = schemas.get("PUBLIC");
+
+        Map<String, IgniteTable> tblMap = GridTestUtils.getFieldValue(pub, "tblMap");
+
+        assertEquals(ImmutableIntList.of(2, 3), tblMap.get("MY_TBL_1").descriptor().distribution().getKeys());
+        assertEquals(ImmutableIntList.of(3), tblMap.get("MY_TBL_2").descriptor().distribution().getKeys());
+    }
+
     /** for test purpose only. */
     public void testThroughput() {
         IgniteCache<Integer, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index 56f880e..c50b1ea 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.processors.query.QueryEngine;
 import org.apache.ignite.internal.util.typedef.F;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 4ec8f16..d172fad 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -437,6 +437,9 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
     /** */
     abstract static class TestTable implements IgniteTable {
         /** */
+        private final String name;
+
+        /** */
         private final RelProtoDataType protoType;
 
         /** */
@@ -460,9 +463,15 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
 
         /** */
         TestTable(RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
+            this(UUID.randomUUID().toString(), type, rewindable, rowCnt);
+        }
+
+        /** */
+        TestTable(String name, RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
             protoType = RelDataTypeImpl.proto(type);
             this.rewindable = rewindable;
             this.rowCnt = rowCnt;
+            this.name = name;
         }
 
         /** {@inheritDoc} */
@@ -604,6 +613,11 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
         @Override public void removeIndex(String idxName) {
             throw new AssertionError();
         }
+
+        /** */
+        public String name() {
+            return name;
+        }
     }
 
     /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
new file mode 100644
index 0000000..e378766
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite to verify join colocation.
+ */
+public class JoinColocationPlannerTest extends AbstractPlannerTest {
+    /** */
+    private static final IgniteTypeFactory TYPE_FACTORY = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+    /** */
+    private static final int DEFAULT_TBL_SIZE = 500_000;
+
+    /**
+     * Join of the same tables with a simple affinity is expected to be colocated.
+     */
+    @Test
+    public void joinSameTableSimpleAff() throws Exception {
+        TestTable tbl = createTable(
+            "TEST_TBL",
+            IgniteDistributions.affinity(0, "default", "hash"),
+            "ID", Integer.class,
+            "VAL", String.class
+        );
+
+        tbl.addIndex(new IgniteIndex(RelCollations.of(0), "PK", null, tbl));
+
+        IgniteSchema schema = createSchema(tbl);
+
+        String sql = "select count(*) " +
+            "from TEST_TBL t1 " +
+            "join TEST_TBL t2 on t1.id = t2.id";
+
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+        IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+        assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+        assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+    }
+
+    /**
+     * Join of the same tables with a complex affinity is expected to be colocated.
+     */
+    @Test
+    public void joinSameTableComplexAff() throws Exception {
+        TestTable tbl = createTable(
+            "TEST_TBL",
+            IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), "hash"),
+            "ID1", Integer.class,
+            "ID2", Integer.class,
+            "VAL", String.class
+        );
+
+        tbl.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, tbl));
+
+        IgniteSchema schema = createSchema(tbl);
+
+        String sql = "select count(*) " +
+            "from TEST_TBL t1 " +
+            "join TEST_TBL t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
+
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+        IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+        assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class));
+        assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class));
+    }
+
+    /**
+     * Re-hashing based on simple affinity is possible, so bigger table with complex affinity
+     * should be sended to the smaller one.
+     */
+    @Test
+    public void joinComplexToSimpleAff() throws Exception {
+        TestTable complexTbl = createTable(
+            "COMPLEX_TBL",
+            2 * DEFAULT_TBL_SIZE,
+            IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), "hash"),
+            "ID1", Integer.class,
+            "ID2", Integer.class,
+            "VAL", String.class
+        );
+
+        complexTbl.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, complexTbl));
+
+        TestTable simpleTbl = createTable(
+            "SIMPLE_TBL",
+            DEFAULT_TBL_SIZE,
+            IgniteDistributions.affinity(0, "default", "hash"),
+            "ID", Integer.class,
+            "VAL", String.class
+        );
+
+        simpleTbl.addIndex(new IgniteIndex(RelCollations.of(0), "PK", null, simpleTbl));
+
+        IgniteSchema schema = createSchema(complexTbl, simpleTbl);
+
+        String sql = "select count(*) " +
+            "from COMPLEX_TBL t1 " +
+            "join SIMPLE_TBL t2 on t1.id1 = t2.id";
+
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+        IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class));
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, join, notNullValue());
+        assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true));
+
+        List<IgniteExchange> exchanges = findNodes(phys, node -> node instanceof IgniteExchange
+            && ((IgniteRel)node).distribution().function().affinity());
+
+        assertThat(invalidPlanMsg, exchanges, hasSize(1));
+        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0), instanceOf(IgniteIndexScan.class));
+        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0)
+            .getTable().unwrap(TestTable.class), equalTo(complexTbl));
+    }
+
+    /**
+     * Re-hashing for complex affinity is not supported.
+     */
+    @Test
+    public void joinComplexToComplexAffWithDifferentOrder() throws Exception {
+        TestTable complexTblDirect = createTable(
+            "COMPLEX_TBL_DIRECT",
+            IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), "hash"),
+            "ID1", Integer.class,
+            "ID2", Integer.class,
+            "VAL", String.class
+        );
+
+        complexTblDirect.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, complexTblDirect));
+
+        TestTable complexTblIndirect = createTable(
+            "COMPLEX_TBL_INDIRECT",
+            IgniteDistributions.affinity(ImmutableIntList.of(1, 0), CU.cacheId("default"), "hash"),
+            "ID1", Integer.class,
+            "ID2", Integer.class,
+            "VAL", String.class
+        );
+
+        complexTblIndirect.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(0, 1)), "PK", null, complexTblIndirect));
+
+        IgniteSchema schema = createSchema(complexTblDirect, complexTblIndirect);
+
+        String sql = "select count(*) " +
+            "from COMPLEX_TBL_DIRECT t1 " +
+            "join COMPLEX_TBL_INDIRECT t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2";
+
+        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
+
+        IgniteMergeJoin exchange = findFirstNode(phys, node -> node instanceof IgniteExchange
+            && ((IgniteRel)node).distribution().function().affinity());
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, exchange, nullValue());
+    }
+
+    /**
+     * Creates test table with given params.
+     *
+     * @param name Name of the table.
+     * @param distr Distribution of the table.
+     * @param fields List of the required fields. Every odd item should be a string
+     *               representing a column name, every even item should be a class representing column's type.
+     *               E.g. {@code createTable("MY_TABLE", distribution, "ID", Integer.class, "VAL", String.class)}.
+     * @return Instance of the {@link TestTable}.
+     */
+    private static TestTable createTable(String name, IgniteDistribution distr, Object... fields) {
+        return createTable(name, DEFAULT_TBL_SIZE, distr, fields);
+    }
+
+    /**
+     * Creates test table with given params.
+     *
+     * @param name Name of the table.
+     * @param size Required size of the table.
+     * @param distr Distribution of the table.
+     * @param fields List of the required fields. Every odd item should be a string
+     *               representing a column name, every even item should be a class representing column's type.
+     *               E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL", String.class)}.
+     * @return Instance of the {@link TestTable}.
+     */
+    private static TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
+        if (F.isEmpty(fields) || fields.length % 2 != 0)
+            throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
+
+        RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(TYPE_FACTORY);
+
+        for (int i = 0; i < fields.length; i += 2)
+            b.add((String)fields[i], TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]));
+
+        return new TestTable(name, b.build(), RewindabilityTrait.REWINDABLE, size) {
+            @Override public IgniteDistribution distribution() {
+                return distr;
+            }
+        };
+    }
+
+    /**
+     * Creates public schema from provided tables.
+     *
+     * @param tbls Tables to create schema for.
+     * @return Public schema.
+     */
+    private static IgniteSchema createSchema(TestTable... tbls) {
+        IgniteSchema schema = new IgniteSchema("PUBLIC");
+
+        for (TestTable tbl : tbls)
+            schema.addTable(tbl.name(), tbl);
+
+        return schema;
+    }
+
+    /**
+     * Matcher to verify size of the collection.
+     *
+     * @param size Required size.
+     * @return {@code true} in case collection is not null and has an exactly the same size.
+     */
+    private static <T extends Collection<?>> Matcher<T> hasSize(int size) {
+        return new CustomMatcher<T>("should be non empty with size=" + size) {
+            @Override public boolean matches(Object item) {
+                return item instanceof Collection && ((Collection<?>)item).size() == size;
+            }
+        };
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 4c7f024..c6cb5ba 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlan
 import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
@@ -39,6 +40,7 @@ import org.junit.runners.Suite;
     AggregatePlannerTest.class,
     HashAggregatePlannerTest.class,
     SortAggregatePlannerTest.class,
+    JoinColocationPlannerTest.class,
 })
 public class PlannerTestSuite {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 59f9488..476495e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -168,7 +168,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
     /** Patter to test incoming query to decide whether this query should be executed with Calcite or H2. */
     private static final Pattern IS_SELECT_OR_EXPLAIN_PATTERN =
-        Pattern.compile("^\\s*(select|explain plan for)", CASE_INSENSITIVE);
+        Pattern.compile("^\\s*(select|explain plan)", CASE_INSENSITIVE);
 
     /** For tests. */
     public static Class<? extends GridQueryIndexing> idxCls;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 39b6e6f..c2b554e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -1293,20 +1292,6 @@ public class GridH2Table extends TableBase {
      * Refreshes table stats if they are possibly outdated, must be called only in client mode.
      */
     private void refreshStatsIfNeededEx() {
-        boolean client = cacheInfo.cacheContext().kernalContext().clientNode();
-
-        GridQueryProcessor qryProc = cacheInfo.cacheContext().kernalContext().query();
-        boolean experimental = qryProc.useExperimentalEngine();
-
-        assert experimental;
-
-        if (!client) {
-            refreshStatsIfNeeded();
-
-            return;
-        }
-
-        // Update stats if total table size changed significantly since the last stats update.
         if (cliReqCnt.getAndIncrement() % STATS_CLI_UPDATE_THRESHOLD == 0) {
             TableStatistics stats = tblStats;