You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/12/09 21:34:24 UTC

[ignite] branch ignite-12248 updated: planner rethinking

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new b45e813  planner rethinking
b45e813 is described below

commit b45e813df3989d5edbf1262678381fdc7caec2c5
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Dec 10 00:34:04 2019 +0300

    planner rethinking
---
 .../{Util.java => InterpreterUtils.java}           |   2 +-
 .../Util.java => plan/volcano/VolcanoUtils.java}   |  16 +-
 .../query/calcite/CalciteQueryProcessor.java       |   2 -
 .../cluster/TableDistributionServiceImpl.java      |  87 ----
 .../query/calcite/exec/ConsumerNode.java           |   2 +-
 .../query/calcite/exec/ImplementorImpl.java        | 174 +++++++
 .../query/calcite/exec/Interpretable.java          | 510 ---------------------
 .../query/calcite/exec/ScalarFactory.java          | 157 +++++++
 .../processors/query/calcite/exec/ScanNode.java    |   2 +-
 .../query/calcite/metadata/FragmentInfo.java       |  10 +-
 .../metadata/IgniteMdDerivedDistribution.java      | 149 ++++++
 .../calcite/metadata/IgniteMdDistribution.java     | 196 +++-----
 .../calcite/metadata/IgniteMdFragmentInfo.java     |  27 +-
 .../query/calcite/metadata/IgniteMetadata.java     |  29 +-
 .../metadata/OptimisticPlanningException.java      |   2 +-
 .../query/calcite/metadata/RelMetadataQueryEx.java |  38 +-
 .../calcite/prepare/DistributedExecution.java      |   8 +-
 .../query/calcite/prepare/IgnitePlanner.java       |  35 +-
 .../query/calcite/prepare/PlannerContext.java      |  23 +-
 .../calcite/{rule => prepare}/PlannerPhase.java    |  29 +-
 .../calcite/{rule => prepare}/PlannerType.java     |   2 +-
 .../query/calcite/rel/IgniteConvention.java        |  41 ++
 .../query/calcite/rel/IgniteExchange.java          |  43 +-
 .../processors/query/calcite/rel/IgniteFilter.java |  85 +---
 .../processors/query/calcite/rel/IgniteJoin.java   |  65 +--
 .../query/calcite/rel/IgniteProject.java           |  77 +---
 .../query/calcite/rel/IgniteReceiver.java          |  39 +-
 .../processors/query/calcite/rel/IgniteRel.java    |  26 +-
 .../calcite/{util => rel}/IgniteRelShuttle.java    |   9 +-
 .../processors/query/calcite/rel/IgniteSender.java |  44 +-
 .../query/calcite/rel/IgniteTableScan.java         |  49 +-
 .../DistributionType.java => rel/Implementor.java} |  41 +-
 .../query/calcite/{util => rel}/RelOp.java         |   2 +-
 .../calcite/rule/AbstractVariableConverter.java    |  47 ++
 .../query/calcite/rule/FilterConverter.java        |  62 +++
 .../query/calcite/rule/IgniteFilterRule.java       |  56 ---
 .../query/calcite/rule/IgniteJoinRule.java         | 140 ------
 .../query/calcite/rule/IgniteProjectRule.java      |  57 ---
 .../processors/query/calcite/rule/IgniteRules.java | 160 -------
 .../query/calcite/rule/JoinConverter.java          |  68 +++
 .../query/calcite/rule/ProjectConverter.java       |  63 +++
 .../TableScanConverter.java}                       |  25 +-
 .../query/calcite/schema/IgniteSchema.java         |   6 +-
 .../query/calcite/schema/IgniteTable.java          |  67 ++-
 .../serialize/relation/ConversionContext.java      |   4 +-
 .../calcite/serialize/relation/FilterNode.java     |  30 +-
 .../serialize/relation/GraphToRelConverter.java    |   7 +-
 .../query/calcite/serialize/relation/JoinNode.java |  10 +-
 .../calcite/serialize/relation/ProjectNode.java    |  19 +-
 .../calcite/serialize/relation/ReceiverNode.java   |  10 +-
 .../serialize/relation/RelToGraphConverter.java    |  10 +-
 .../calcite/serialize/relation/SenderNode.java     |  22 +-
 .../serialize/relation/SerializedTraits.java       |  28 +-
 .../calcite/serialize/relation/TableScanNode.java  |  10 +-
 .../query/calcite/{util => splitter}/Edge.java     |   2 +-
 .../query/calcite/splitter/Fragment.java           |  15 +-
 .../query/calcite/splitter/QueryPlan.java          |   7 +-
 .../splitter/{Source.java => RelSource.java}       |   8 +-
 .../{SourceImpl.java => RelSourceImpl.java}        |   4 +-
 .../splitter/{Target.java => RelTarget.java}       |   6 +-
 .../{TargetImpl.java => RelTargetImpl.java}        |  10 +-
 .../query/calcite/splitter/Splitter.java           |   2 +-
 ...shFunctionFactory.java => AffinityFactory.java} |  42 +-
 .../query/calcite/trait/DistributionTrait.java     |  83 +++-
 .../query/calcite/trait/DistributionTraitDef.java  |  43 +-
 .../query/calcite/trait/HashFunctionFactory.java   |   2 +-
 .../IgniteDistribution.java}                       |  11 +-
 .../query/calcite/trait/IgniteDistributions.java   | 252 ++++++++--
 .../processors/query/calcite/util/Commons.java     |  49 +-
 .../query/calcite/util/IgniteMethod.java           |   4 +-
 .../query/calcite/util/RelImplementor.java         |  40 --
 .../query/calcite/CalciteQueryProcessorTest.java   | 265 +++++------
 72 files changed, 1729 insertions(+), 2038 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java b/modules/calcite/src/main/java/org/apache/calcite/interpreter/InterpreterUtils.java
similarity index 96%
copy from modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
copy to modules/calcite/src/main/java/org/apache/calcite/interpreter/InterpreterUtils.java
index a4ccb83..edae3cc 100644
--- a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
+++ b/modules/calcite/src/main/java/org/apache/calcite/interpreter/InterpreterUtils.java
@@ -21,7 +21,7 @@ import org.apache.calcite.DataContext;
 /**
  *
  */
-public class Util {
+public class InterpreterUtils {
     public static Context createContext(DataContext ctx) {
         return new Context(ctx);
     }
diff --git a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java b/modules/calcite/src/main/java/org/apache/calcite/plan/volcano/VolcanoUtils.java
similarity index 63%
rename from modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
rename to modules/calcite/src/main/java/org/apache/calcite/plan/volcano/VolcanoUtils.java
index a4ccb83..a1874dc 100644
--- a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
+++ b/modules/calcite/src/main/java/org/apache/calcite/plan/volcano/VolcanoUtils.java
@@ -14,15 +14,21 @@
  * limitations under the License.
  */
 
-package org.apache.calcite.interpreter;
+package org.apache.calcite.plan.volcano;
 
-import org.apache.calcite.DataContext;
+import org.apache.calcite.plan.RelTraitSet;
 
 /**
  *
  */
-public class Util {
-    public static Context createContext(DataContext ctx) {
-        return new Context(ctx);
+public class VolcanoUtils {
+    public static RelSubset subset(RelSubset subset, RelTraitSet traits) {
+        return subset.set.getOrCreateSubset(subset.getCluster(), traits.simplify());
+    }
+
+    public static VolcanoPlanner impatient(VolcanoPlanner planner) {
+        planner.impatient = true;
+
+        return planner;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index c244f4e..86a9a2e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.QueryEngine;
 import org.apache.ignite.internal.processors.query.calcite.cluster.MappingServiceImpl;
-import org.apache.ignite.internal.processors.query.calcite.cluster.TableDistributionServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
@@ -154,7 +153,6 @@ public class CalciteQueryProcessor implements QueryEngine {
             .query(query)
             .schema(schemaHolder.schema())
             .topologyVersion(readyAffinityVersion())
-            .distributionService(new TableDistributionServiceImpl(kernalContext))
             .mappingService(new MappingServiceImpl(kernalContext))
             .build();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
deleted file mode 100644
index 20da342..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.cluster;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.function.ToIntFunction;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- *
- */
-public class TableDistributionServiceImpl implements TableDistributionService {
-    private final GridKernalContext ctx;
-
-    public TableDistributionServiceImpl(GridKernalContext ctx) {
-        this.ctx = ctx;
-    }
-
-    @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-        CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
-
-        if (grp.isReplicated())
-            return IgniteDistributions.broadcast();
-
-        Object key = grp.affinity().similarAffinityKey();
-
-        return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
-    }
-
-    private final static class AffinityFactory extends AbstractDestinationFunctionFactory {
-        private final int cacheId;
-        private final Object key;
-
-        AffinityFactory(int cacheId, Object key) {
-            this.cacheId = cacheId;
-            this.key = key;
-        }
-
-        @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) {
-            assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
-
-            List<List<UUID>> assignments = mapping.assignments();
-
-            if (U.assertionsEnabled()) {
-                for (List<UUID> assignment : assignments) {
-                    assert F.isEmpty(assignment) || assignment.size() == 1;
-                }
-            }
-
-            ToIntFunction<Object> rowToPart = ctx.kernalContext()
-                .cache().context().cacheContext(cacheId).affinity()::partition;
-
-            return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
-        }
-
-        @Override public Object key() {
-            return key;
-        }
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
index c2a6211..100f21f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -30,7 +30,7 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
 
     private ArrayDeque<Object[]> buff;
 
-    protected ConsumerNode() {
+    public ConsumerNode() {
         super(Sink.noOp());
 
         buff = new ArrayDeque<>(DEFAULT_BUFFER_SIZE);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
new file mode 100644
index 0000000..2a65887
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Outbox;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
+import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+
+import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.PLANNER_CONTEXT;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.QUERY_ID;
+
+/**
+ *
+ */
+public class ImplementorImpl implements Implementor<Node<Object[]>>, RelOp<IgniteRel, Node<Object[]>> {
+    private final PlannerContext ctx;
+    private final DataContext root;
+    private final ScalarFactory factory;
+    private Deque<Sink<Object[]>> stack;
+
+    public ImplementorImpl(DataContext root) {
+        this.root = root;
+
+        ctx = PLANNER_CONTEXT.get(root);
+        factory = new ScalarFactory(new RexBuilder(ctx.typeFactory()));
+        stack = new ArrayDeque<>();
+    }
+
+    @Override public Node<Object[]> implement(IgniteSender rel) {
+        assert stack.isEmpty();
+
+        GridCacheVersion id = QUERY_ID.get(root);
+        long exchangeId = rel.target().exchangeId();
+        NodesMapping mapping = rel.target().mapping();
+        List<UUID> targets = mapping.nodes();
+        IgniteDistribution distribution = rel.target().distribution();
+        DestinationFunctionFactory destFactory = distribution.destinationFunctionFactory();
+        DestinationFunction function = destFactory.create(ctx, mapping, ImmutableIntList.copyOf(distribution.getKeys()));
+
+        Outbox<Object[]> res = new Outbox<>(id, exchangeId, targets, function);
+
+        stack.push(res.sink());
+
+        res.source(source(rel.getInput()));
+
+        return res;
+    }
+
+    @Override public Node<Object[]> implement(IgniteFilter rel) {
+        assert !stack.isEmpty();
+
+        FilterNode res = new FilterNode(stack.pop(), factory.filterPredicate(root, rel.getCondition(), rel.getRowType()));
+
+        stack.push(res.sink());
+
+        res.source(source(rel.getInput()));
+
+        return res;
+    }
+
+    @Override public Node<Object[]> implement(IgniteProject rel) {
+        assert !stack.isEmpty();
+
+        ProjectNode res = new ProjectNode(stack.pop(), factory.projectExpression(root, rel.getProjects(), rel.getInput().getRowType()));
+
+        stack.push(res.sink());
+
+        res.source(source(rel.getInput()));
+
+        return res;
+    }
+
+    @Override public Node<Object[]> implement(IgniteJoin rel) {
+        assert !stack.isEmpty();
+
+        JoinNode res = new JoinNode(stack.pop(), factory.joinExpression(root, rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType()));
+
+        stack.push(res.sink(1));
+        stack.push(res.sink(0));
+
+        res.sources(sources(rel.getInputs()));
+
+        return res;
+    }
+
+    @Override public Node<Object[]> implement(IgniteTableScan rel) {
+        assert !stack.isEmpty();
+
+        Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root);
+
+        return new ScanNode(stack.pop(), source);
+    }
+
+    @Override public Node<Object[]> implement(IgniteReceiver rel) {
+        throw new AssertionError(); // TODO
+    }
+
+    @Override public Node<Object[]> implement(IgniteExchange rel) {
+        throw new AssertionError();
+    }
+
+    @Override public Node<Object[]> implement(IgniteRel other) {
+        throw new AssertionError();
+    }
+
+    private Source source(RelNode rel) {
+        if (rel.getConvention() != IgniteConvention.INSTANCE)
+            throw new IllegalStateException("INTERPRETABLE is required.");
+
+        return ((IgniteRel) rel).implement(this);
+    }
+
+    private List<Source> sources(List<RelNode> rels) {
+        ArrayList<Source> res = new ArrayList<>(rels.size());
+
+        for (RelNode rel : rels) {
+            res.add(source(rel));
+        }
+
+        return res;
+    }
+
+    @Override public Node<Object[]> go(IgniteRel rel) {
+        if (rel instanceof IgniteSender)
+            return implement((IgniteSender) rel);
+
+        ConsumerNode res = new ConsumerNode();
+
+        stack.push(res.sink());
+
+        res.source(source(rel));
+
+        return res;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java
deleted file mode 100644
index 04108d4..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.exec;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.JaninoRexCompiler;
-import org.apache.calcite.interpreter.Scalar;
-import org.apache.calcite.interpreter.Util;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.AbstractRelNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.calcite.exchange.Outbox;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.jetbrains.annotations.NotNull;
-
-import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.PLANNER_CONTEXT;
-import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.QUERY_ID;
-
-/**
- *
- */
-public class Interpretable {
-    public static final Convention INTERPRETABLE = new Convention.Impl("INTERPRETABLE", InterRel.class) {};
-
-    public static final List<RelOptRule> RULES = ImmutableList.of(
-        new TableScanConverter(),
-        new JoinConverter(),
-        new ProjectConverter(),
-        new FilterConverter(),
-        new SenderConverter(),
-        new ReceiverConverter()
-    );
-
-    public interface InterRel extends RelNode {
-        <T> Node<T> implement(Implementor<T> implementor);
-    }
-
-    public static class TableScanConverter extends ConverterRule {
-        public TableScanConverter() {
-            super(IgniteTableScan.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "TableScanConverter");
-        }
-
-        @Override public RelNode convert(RelNode rel) {
-            IgniteTableScan scan = (IgniteTableScan) rel;
-
-            RelTraitSet traitSet = scan.getTraitSet().replace(INTERPRETABLE);
-            return new ScanRel(rel.getCluster(), traitSet, scan.getTable());
-        }
-    }
-
-    public static class JoinConverter extends ConverterRule {
-        public JoinConverter() {
-            super(IgniteJoin.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "JoinConverter");
-        }
-
-        @Override public RelNode convert(RelNode rel) {
-            IgniteJoin join = (IgniteJoin) rel;
-
-            RelNode left = convert(join.getLeft(), INTERPRETABLE);
-            RelNode right = convert(join.getRight(), INTERPRETABLE);
-
-            RelTraitSet traitSet = join.getTraitSet().replace(INTERPRETABLE);
-
-            return new JoinRel(rel.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType());
-        }
-    }
-
-    public static class ProjectConverter extends ConverterRule {
-        public ProjectConverter() {
-            super(IgniteProject.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ProjectConverter");
-        }
-
-        @Override public RelNode convert(RelNode rel) {
-            IgniteProject project = (IgniteProject) rel;
-            RelTraitSet traitSet = project.getTraitSet().replace(INTERPRETABLE);
-            RelNode input = convert(project.getInput(), INTERPRETABLE);
-
-            return new ProjectRel(rel.getCluster(), traitSet, input, project.getProjects(), project.getRowType());
-        }
-    }
-
-    public static class FilterConverter extends ConverterRule {
-        public FilterConverter() {
-            super(IgniteFilter.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "FilterConverter");
-        }
-
-        @Override public RelNode convert(RelNode rel) {
-            IgniteFilter filter = (IgniteFilter) rel;
-            RelTraitSet traitSet = filter.getTraitSet().replace(INTERPRETABLE);
-            RelNode input = convert(filter.getInput(), INTERPRETABLE);
-
-            return new FilterRel(rel.getCluster(), traitSet, input, filter.getCondition());
-        }
-    }
-
-    public static class SenderConverter extends ConverterRule {
-        public SenderConverter() {
-            super(IgniteSender.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "SenderConverter");
-        }
-
-        @Override public RelNode convert(RelNode rel) {
-            IgniteSender sender = (IgniteSender) rel;
-            RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE);
-            RelNode input = convert(sender.getInput(), INTERPRETABLE);
-
-            return new SenderRel(rel.getCluster(), traitSet, input, sender.target());
-        }
-    }
-
-    public static class ReceiverConverter extends ConverterRule {
-        public ReceiverConverter() {
-            super(IgniteReceiver.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ReceiverConverter");
-        }
-
-        @Override public RelNode convert(RelNode rel) {
-            IgniteReceiver sender = (IgniteReceiver) rel;
-            RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE);
-
-            return new ReceiverRel(rel.getCluster(), traitSet, sender.source());
-        }
-    }
-
-    public static class ReceiverRel extends AbstractRelNode implements InterRel {
-        private final org.apache.ignite.internal.processors.query.calcite.splitter.Source source;
-
-        protected ReceiverRel(RelOptCluster cluster, RelTraitSet traits, org.apache.ignite.internal.processors.query.calcite.splitter.Source source) {
-            super(cluster, traits);
-
-            this.source = source;
-        }
-
-        @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-            return new ReceiverRel(getCluster(), traitSet, source);
-        }
-
-        @Override public <T> Node<T> implement(Implementor<T> implementor) {
-            return implementor.implement(this);
-        }
-    }
-
-    public static class SenderRel extends SingleRel implements InterRel {
-        private final Target target;
-
-        protected SenderRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull Target target) {
-            super(cluster, traits, input);
-            this.target = target;
-        }
-
-        @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-            return new SenderRel(getCluster(), traitSet, sole(inputs), target);
-        }
-
-        @Override public <T> Node<T> implement(Implementor<T> implementor) {
-            return implementor.implement(this);
-        }
-    }
-
-    public static class FilterRel extends Filter implements InterRel {
-        protected FilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
-            super(cluster, traits, child, condition);
-        }
-
-        @Override public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
-            return new FilterRel(getCluster(), traitSet, input, condition);
-        }
-
-        @Override public <T> Node<T> implement(Implementor<T> implementor) {
-            return implementor.implement(this);
-        }
-    }
-
-    public static class ProjectRel extends Project implements InterRel {
-        protected ProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-            super(cluster, traits, input, projects, rowType);
-        }
-
-        @Override public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
-            return new ProjectRel(getCluster(), traitSet, input, projects, rowType);
-        }
-
-        @Override public <T> Node<T> implement(Implementor<T> implementor) {
-            return implementor.implement(this);
-        }
-    }
-
-    public static class JoinRel extends Join implements InterRel {
-        protected JoinRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
-            super(cluster, traitSet, left, right, condition, variablesSet, joinType);
-        }
-
-        @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
-            return new JoinRel(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
-        }
-
-        @Override public <T> Node<T> implement(Implementor<T> implementor) {
-            return implementor.implement(this);
-        }
-    }
-
-    public static class ScanRel extends TableScan implements InterRel {
-        protected ScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
-            super(cluster, traitSet, table);
-        }
-
-        @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-            return this;
-        }
-
-        @Override public <T> Node<T> implement(Implementor<T> implementor) {
-            return implementor.implement(this);
-        }
-    }
-
-    public static class Implementor<T> {
-        private final PlannerContext ctx;
-        private final DataContext root;
-        private final ScalarFactory factory;
-        private Deque<Sink<T>> stack;
-
-        public Implementor(DataContext root) {
-            this.root = root;
-
-            ctx = PLANNER_CONTEXT.get(root);
-            factory = new ScalarFactory(new RexBuilder(ctx.typeFactory()));
-            stack = new ArrayDeque<>();
-        }
-
-        public Node<T> implement(SenderRel rel) {
-            assert stack.isEmpty();
-
-            GridCacheVersion id = QUERY_ID.get(root);
-            long exchangeId = rel.target.exchangeId();
-            NodesMapping mapping = rel.target.mapping();
-            List<UUID> targets = mapping.nodes();
-            DistributionTrait distribution = rel.target.distribution();
-            DestinationFunctionFactory destFactory = distribution.destinationFunctionFactory();
-            DestinationFunction function = destFactory.create(ctx, mapping, distribution.keys());
-
-            Outbox<T> res = new Outbox<>(id, exchangeId, targets, function);
-
-            stack.push(res.sink());
-
-            res.source(source(rel.getInput()));
-
-            return res;
-        }
-
-        public Node<T> implement(FilterRel rel) {
-            assert !stack.isEmpty();
-
-            FilterNode res = new FilterNode((Sink<Object[]>) stack.pop(), factory.filterPredicate(root, rel.getCondition(), rel.getRowType()));
-
-            stack.push((Sink<T>) res.sink());
-
-            res.source(source(rel.getInput()));
-
-            return (Node<T>) res;
-        }
-
-        public Node<T> implement(ProjectRel rel) {
-            assert !stack.isEmpty();
-
-            ProjectNode res = new ProjectNode((Sink<Object[]>) stack.pop(), factory.projectExpression(root, rel.getProjects(), rel.getInput().getRowType()));
-
-            stack.push((Sink<T>) res.sink());
-
-            res.source(source(rel.getInput()));
-
-            return (Node<T>) res;
-        }
-
-        public Node<T> implement(JoinRel rel) {
-            assert !stack.isEmpty();
-
-            JoinNode res = new JoinNode((Sink<Object[]>) stack.pop(), factory.joinExpression(root, rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType()));
-
-            stack.push((Sink<T>) res.sink(1));
-            stack.push((Sink<T>) res.sink(0));
-
-            res.sources(sources(rel.getInputs()));
-
-            return (Node<T>) res;
-        }
-
-        public Node<T> implement(ScanRel rel) {
-            assert !stack.isEmpty();
-
-            Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root);
-
-            return (Node<T>) new ScanNode((Sink<Object[]>)stack.pop(), source);
-        }
-
-        public Node<T> implement(ReceiverRel rel) {
-            throw new AssertionError(); // TODO
-        }
-
-        private Source source(RelNode rel) {
-            if (rel.getConvention() != INTERPRETABLE)
-                throw new IllegalStateException("INTERPRETABLE is required.");
-
-            return ((InterRel)rel).implement(this);
-        }
-
-        private List<Source> sources(List<RelNode> rels) {
-            ArrayList<Source> res = new ArrayList<>(rels.size());
-
-            for (RelNode rel : rels) {
-                res.add(source(rel));
-            }
-
-            return res;
-        }
-
-        public Node<T> go(RelNode rel) {
-            if (rel.getConvention() != INTERPRETABLE)
-                throw new IllegalStateException("INTERPRETABLE is required.");
-
-            if (rel instanceof SenderRel)
-                return implement((SenderRel)rel);
-
-            ConsumerNode res = new ConsumerNode();
-
-            stack.push((Sink<T>) res.sink());
-
-            res.source(source(rel));
-
-            return (Node<T>) res;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class ScalarFactory {
-        private final JaninoRexCompiler rexCompiler;
-        private final RexBuilder builder;
-
-        private ScalarFactory(RexBuilder builder) {
-            rexCompiler = new JaninoRexCompiler(builder);
-            this.builder = builder;
-        }
-
-        public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
-            System.out.println("filterPredicate for" + filter);
-
-            Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
-            Context ctx = Util.createContext(root);
-
-            return new FilterPredicate<>(ctx, scalar);
-        }
-
-        public <T> Function<T, T> projectExpression(DataContext root, List<RexNode> projects, RelDataType rowType) {
-            System.out.println("joinExpression for" + projects);
-
-            Scalar scalar = rexCompiler.compile(projects, rowType);
-            Context ctx = Util.createContext(root);
-            int count = projects.size();
-
-            return new ProjectExpression<>(ctx, scalar, count);
-        }
-
-        public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
-            System.out.println("joinExpression for" + expression);
-
-            RelDataType rowType = combinedType(leftType, rightType);
-
-            Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
-            Context ctx = Util.createContext(root);
-            ctx.values = new Object[rowType.getFieldCount()];
-
-            return new JoinExpression<>(ctx, scalar);
-        }
-
-        private RelDataType combinedType(RelDataType... types) {
-            RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
-
-            for (RelDataType type : types)
-                typeBuilder.addAll(type.getFieldList());
-
-            return typeBuilder.build();
-        }
-
-        private static class FilterPredicate<T> implements Predicate<T> {
-            private final Context ctx;
-            private final Scalar scalar;
-            private final Object[] vals;
-
-            private FilterPredicate(Context ctx, Scalar scalar) {
-                this.ctx = ctx;
-                this.scalar = scalar;
-
-                vals = new Object[1];
-            }
-
-            @Override public boolean test(T r) {
-                ctx.values = (Object[]) r;
-                scalar.execute(ctx, vals);
-                return (Boolean) vals[0];
-            }
-        }
-
-        private static class JoinExpression<T> implements BiFunction<T, T, T> {
-            private final Object[] vals;
-            private final Context ctx;
-            private final Scalar scalar;
-
-            private Object[] left0;
-
-            private JoinExpression(Context ctx, Scalar scalar) {
-                this.ctx = ctx;
-                this.scalar = scalar;
-
-                vals = new Object[1];
-            }
-
-            @Override public T apply(T left, T right) {
-                if (left0 != left) {
-                    left0 = (Object[]) left;
-                    System.arraycopy(left0, 0, ctx.values, 0, left0.length);
-                }
-
-                Object[] right0 = (Object[]) right;
-                System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
-
-                scalar.execute(ctx, vals);
-
-                if ((Boolean) vals[0])
-                    return (T) Arrays.copyOf(ctx.values, ctx.values.length);
-
-                return null;
-            }
-        }
-
-        private static class ProjectExpression<T> implements Function<T, T> {
-            private final Context ctx;
-            private final Scalar scalar;
-            private final int count;
-
-            private ProjectExpression(Context ctx, Scalar scalar, int count) {
-                this.ctx = ctx;
-                this.scalar = scalar;
-                this.count = count;
-            }
-
-            @Override public T apply(T r) {
-                ctx.values = (Object[]) r;
-                Object[] res = new Object[count];
-                scalar.execute(ctx, res);
-
-                return (T) res;
-            }
-        }
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
new file mode 100644
index 0000000..1d4e31d
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.InterpreterUtils;
+import org.apache.calcite.interpreter.JaninoRexCompiler;
+import org.apache.calcite.interpreter.Scalar;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ *
+ */
+public class ScalarFactory {
+    private final JaninoRexCompiler rexCompiler;
+    private final RexBuilder builder;
+
+    public ScalarFactory(RexBuilder builder) {
+        rexCompiler = new JaninoRexCompiler(builder);
+        this.builder = builder;
+    }
+
+    public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
+        System.out.println("filterPredicate for" + filter);
+
+        Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
+        Context ctx = InterpreterUtils.createContext(root);
+
+        return new FilterPredicate<>(ctx, scalar);
+    }
+
+    public <T> Function<T, T> projectExpression(DataContext root, List<RexNode> projects, RelDataType rowType) {
+        System.out.println("joinExpression for" + projects);
+
+        Scalar scalar = rexCompiler.compile(projects, rowType);
+        Context ctx = InterpreterUtils.createContext(root);
+        int count = projects.size();
+
+        return new ProjectExpression<>(ctx, scalar, count);
+    }
+
+    public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
+        System.out.println("joinExpression for" + expression);
+
+        RelDataType rowType = combinedType(leftType, rightType);
+
+        Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
+        Context ctx = InterpreterUtils.createContext(root);
+        ctx.values = new Object[rowType.getFieldCount()];
+
+        return new JoinExpression<>(ctx, scalar);
+    }
+
+    private RelDataType combinedType(RelDataType... types) {
+        RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
+
+        for (RelDataType type : types)
+            typeBuilder.addAll(type.getFieldList());
+
+        return typeBuilder.build();
+    }
+
+    private static class FilterPredicate<T> implements Predicate<T> {
+        private final Context ctx;
+        private final Scalar scalar;
+        private final Object[] vals;
+
+        private FilterPredicate(Context ctx, Scalar scalar) {
+            this.ctx = ctx;
+            this.scalar = scalar;
+
+            vals = new Object[1];
+        }
+
+        @Override public boolean test(T r) {
+            ctx.values = (Object[]) r;
+            scalar.execute(ctx, vals);
+            return (Boolean) vals[0];
+        }
+    }
+
+    private static class JoinExpression<T> implements BiFunction<T, T, T> {
+        private final Object[] vals;
+        private final Context ctx;
+        private final Scalar scalar;
+
+        private Object[] left0;
+
+        private JoinExpression(Context ctx, Scalar scalar) {
+            this.ctx = ctx;
+            this.scalar = scalar;
+
+            vals = new Object[1];
+        }
+
+        @Override public T apply(T left, T right) {
+            if (left0 != left) {
+                left0 = (Object[]) left;
+                System.arraycopy(left0, 0, ctx.values, 0, left0.length);
+            }
+
+            Object[] right0 = (Object[]) right;
+            System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
+
+            scalar.execute(ctx, vals);
+
+            if ((Boolean) vals[0])
+                return (T) Arrays.copyOf(ctx.values, ctx.values.length);
+
+            return null;
+        }
+    }
+
+    private static class ProjectExpression<T> implements Function<T, T> {
+        private final Context ctx;
+        private final Scalar scalar;
+        private final int count;
+
+        private ProjectExpression(Context ctx, Scalar scalar, int count) {
+            this.ctx = ctx;
+            this.scalar = scalar;
+            this.count = count;
+        }
+
+        @Override public T apply(T r) {
+            ctx.values = (Object[]) r;
+            Object[] res = new Object[count];
+            scalar.execute(ctx, res);
+
+            return (T) res;
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
index 37c128a..6acaa2b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -32,7 +32,7 @@ public class ScanNode implements SingleNode<Object[]> {
     private Iterator<Object[]> it;
     private Object[] row;
 
-    protected ScanNode(Sink<Object[]> target, Iterable<Object[]> source) {
+    public ScanNode(Sink<Object[]> target, Iterable<Object[]> source) {
         this.target = target;
         this.source = source;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index b282ea7..de15e4d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -19,16 +19,16 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
 
 /**
  *
  */
 public class FragmentInfo {
     private final NodesMapping mapping;
-    private final ImmutableList<Pair<IgniteReceiver, Source>> sources;
+    private final ImmutableList<Pair<IgniteReceiver, RelSource>> sources;
 
-    public FragmentInfo(Pair<IgniteReceiver, Source> source) {
+    public FragmentInfo(Pair<IgniteReceiver, RelSource> source) {
         this(ImmutableList.of(source), null);
     }
 
@@ -36,7 +36,7 @@ public class FragmentInfo {
         this(null, mapping);
     }
 
-    public FragmentInfo(ImmutableList<Pair<IgniteReceiver, Source>> sources, NodesMapping mapping) {
+    public FragmentInfo(ImmutableList<Pair<IgniteReceiver, RelSource>> sources, NodesMapping mapping) {
         this.sources = sources;
         this.mapping = mapping;
     }
@@ -45,7 +45,7 @@ public class FragmentInfo {
         return mapping;
     }
 
-    public ImmutableList<Pair<IgniteReceiver, Source>> sources() {
+    public ImmutableList<Pair<IgniteReceiver, RelSource>> sources() {
         return sources;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDerivedDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDerivedDistribution.java
new file mode 100644
index 0000000..6858b13
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDerivedDistribution.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.AbstractConverter;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.plan.volcano.VolcanoUtils;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class IgniteMdDerivedDistribution implements MetadataHandler<DerivedDistribution> {
+    /** */
+    private static final ThreadLocal<Convention> REQUESTED_CONVENTION = ThreadLocal.withInitial(() -> Convention.NONE);
+
+    public static final RelMetadataProvider SOURCE =
+        ReflectiveRelMetadataProvider.reflectiveSource(
+            IgniteMethod.DERIVED_DISTRIBUTIONS.method(), new IgniteMdDerivedDistribution());
+
+    @Override public MetadataDef<DerivedDistribution> getDef() {
+        return DerivedDistribution.DEF;
+    }
+
+    public List<IgniteDistribution> deriveDistributions(AbstractConverter rel, RelMetadataQuery mq) {
+        return Collections.emptyList();
+    }
+
+    public List<IgniteDistribution> deriveDistributions(RelNode rel, RelMetadataQuery mq) {
+        return F.asList(IgniteMdDistribution._distribution(rel, mq));
+    }
+
+    public List<IgniteDistribution> deriveDistributions(IgniteRel rel, RelMetadataQuery mq) {
+        return F.asList(IgniteMdDistribution._distribution(rel, mq));
+    }
+
+    public List<IgniteDistribution> deriveDistributions(LogicalTableScan rel, RelMetadataQuery mq) {
+        return F.asList(IgniteMdDistribution._distribution(rel, mq));
+    }
+
+    public List<IgniteDistribution> deriveDistributions(LogicalValues rel, RelMetadataQuery mq) {
+        return F.asList(IgniteMdDistribution._distribution(rel, mq));
+    }
+
+    public List<IgniteDistribution> deriveDistributions(LogicalProject rel, RelMetadataQuery mq) {
+        Mappings.TargetMapping mapping =
+            Project.getPartialMapping(rel.getInput().getRowType().getFieldCount(), rel.getProjects());
+
+        return Commons.transform(_deriveDistributions(rel.getInput(), mq), i -> i.apply(mapping));
+    }
+
+    public List<IgniteDistribution> deriveDistributions(SingleRel rel, RelMetadataQuery mq) {
+        if (rel instanceof IgniteRel)
+            return deriveDistributions((IgniteRel)rel, mq);
+
+        return _deriveDistributions(rel.getInput(), mq);
+    }
+
+    public List<IgniteDistribution> deriveDistributions(RelSubset rel, RelMetadataQuery mq) {
+        rel = VolcanoUtils.subset(rel, rel.getTraitSet().replace(REQUESTED_CONVENTION.get()));
+
+        HashSet<IgniteDistribution> res = new HashSet<>();
+
+        for (RelNode rel0 : rel.getRels())
+            res.addAll(_deriveDistributions(rel0, mq));
+
+        if (F.isEmpty(res)) {
+            RelSubset newRel = VolcanoUtils.subset(rel, rel.getTraitSet().replace(Convention.NONE));
+
+            if (newRel != rel) {
+                for (RelNode rel0 : newRel.getRels())
+                    res.addAll(_deriveDistributions(rel0, mq));
+            }
+        }
+
+        return new ArrayList<>(res);
+    }
+
+    public List<IgniteDistribution> deriveDistributions(HepRelVertex rel, RelMetadataQuery mq) {
+        return _deriveDistributions(rel.getCurrentRel(), mq);
+    }
+
+    public List<IgniteDistribution> deriveDistributions(LogicalFilter rel, RelMetadataQuery mq) {
+        return _deriveDistributions(rel.getInput(), mq);
+    }
+
+    public List<IgniteDistribution> deriveDistributions(LogicalJoin rel, RelMetadataQuery mq) {
+        List<IgniteDistribution> left = _deriveDistributions(rel.getLeft(), mq);
+        List<IgniteDistribution> right = _deriveDistributions(rel.getRight(), mq);
+
+        return Commons.transform(IgniteDistributions.suggestJoin(left, right, rel.analyzeCondition(), rel.getJoinType()),
+            IgniteDistributions.BiSuggestion::out);
+    }
+
+    private static List<IgniteDistribution> _deriveDistributions(RelNode rel, RelMetadataQuery mq) {
+        return RelMetadataQueryEx.wrap(mq).derivedDistributions(rel);
+    }
+
+    public static List<IgniteDistribution> deriveDistributions(RelNode rel, Convention convention, RelMetadataQuery mq) {
+        try {
+            REQUESTED_CONVENTION.set(convention);
+
+            return _deriveDistributions(rel, mq);
+        }
+        finally {
+            REQUESTED_CONVENTION.remove();
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index 7c0491d..9673835 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -16,186 +16,108 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexSlot;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.calcite.util.BuiltInMethod;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.typedef.F;
 
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE;
+import static org.apache.calcite.rel.RelDistribution.Type.ANY;
 
 /**
  *
  */
-public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.DistributionTraitMetadata> {
+public class IgniteMdDistribution implements MetadataHandler<BuiltInMetadata.Distribution> {
     public static final RelMetadataProvider SOURCE =
         ReflectiveRelMetadataProvider.reflectiveSource(
-            IgniteMethod.DISTRIBUTION_TRAIT.method(), new IgniteMdDistribution());
+            BuiltInMethod.DISTRIBUTION.method, new IgniteMdDistribution());
 
-    @Override public MetadataDef<IgniteMetadata.DistributionTraitMetadata> getDef() {
-        return IgniteMetadata.DistributionTraitMetadata.DEF;
+    @Override public MetadataDef<BuiltInMetadata.Distribution> getDef() {
+        return BuiltInMetadata.Distribution.DEF;
     }
 
-    public DistributionTrait getDistributionTrait(RelNode rel, RelMetadataQuery mq) {
+    public IgniteDistribution distribution(RelNode rel, RelMetadataQuery mq) {
         return DistributionTraitDef.INSTANCE.getDefault();
     }
 
-    public DistributionTrait getDistributionTrait(Filter filter, RelMetadataQuery mq) {
+    public IgniteDistribution distribution(Filter filter, RelMetadataQuery mq) {
         return filter(mq, filter.getInput(), filter.getCondition());
     }
 
-    public DistributionTrait getDistributionTrait(Project project, RelMetadataQuery mq) {
+    public IgniteDistribution distribution(Project project, RelMetadataQuery mq) {
         return project(mq, project.getInput(), project.getProjects());
     }
 
-    public DistributionTrait getDistributionTrait(Join join, RelMetadataQuery mq) {
+    public IgniteDistribution distribution(Join join, RelMetadataQuery mq) {
         return join(mq, join.getLeft(), join.getRight(), join.analyzeCondition(), join.getJoinType());
     }
 
-    public DistributionTrait getDistributionTrait(RelSubset rel, RelMetadataQuery mq) {
+    public IgniteDistribution distribution(RelSubset rel, RelMetadataQuery mq) {
         return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    public DistributionTrait getDistributionTrait(IgniteTableScan rel, RelMetadataQuery mq) {
+    public IgniteDistribution distribution(TableScan rel, RelMetadataQuery mq) {
         return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    public static DistributionTrait project(RelMetadataQuery mq, RelNode input, List<? extends RexNode> projects) {
-        DistributionTrait trait = distribution(input, mq);
-
-        if (trait.type() == HASH) {
-            ImmutableIntList keys = trait.keys();
-
-            if (keys.size() > projects.size())
-                return IgniteDistributions.random();
-
-            Map<Integer, Integer> m = new HashMap<>(projects.size());
-
-            for (Ord<? extends RexNode> node : Ord.zip(projects)) {
-                if (node.e instanceof RexInputRef)
-                    m.put( ((RexSlot) node.e).getIndex(), node.i);
-                else if (node.e.isA(SqlKind.CAST)) {
-                    RexNode operand = ((RexCall) node.e).getOperands().get(0);
-
-                    if (operand instanceof RexInputRef)
-                        m.put(((RexSlot) operand).getIndex(), node.i);
-                }
-            }
-
-            List<Integer> newKeys = new ArrayList<>(keys.size());
-
-            for (Integer key : keys) {
-                Integer mapped = m.get(key);
-
-                if (mapped == null)
-                    return IgniteDistributions.random();
-
-                newKeys.add(mapped);
-            }
-
-            return IgniteDistributions.hash(newKeys, trait.destinationFunctionFactory());
-        }
-
-        return trait;
-    }
-
-    public static DistributionTrait filter(RelMetadataQuery mq, RelNode input, RexNode condition) {
-        return distribution(input, mq);
-    }
-
-    public static DistributionTrait join(RelMetadataQuery mq, RelNode left, RelNode right, JoinInfo joinInfo, JoinRelType joinType) {
-        /*
-         * Distributions table:
-         *
-         * ===============INNER JOIN==============
-         * hash + hash = hash
-         * broadcast + hash = hash
-         * hash + broadcast = hash
-         * broadcast + broadcast = broadcast
-         * single + single = single
-         *
-         * ===============LEFT JOIN===============
-         * hash + hash = hash
-         * hash + broadcast = hash
-         * broadcast + broadcast = broadcast
-         * single + single = single
-         *
-         * ===============RIGHT JOIN==============
-         * hash + hash = hash
-         * broadcast + hash = hash
-         * broadcast + broadcast = broadcast
-         * single + single = single
-         *
-         * ===========FULL JOIN/CROSS JOIN========
-         * broadcast + broadcast = broadcast
-         * single + single = single
-         *
-         *
-         * others are impossible TODO assertions
-         */
-
-        DistributionTrait leftDistr = distribution(left, mq);
-        DistributionTrait rightDistr;
-
-        switch (joinType) {
-            case FULL:
-            case LEFT:
-                return leftDistr;
-            case INNER:
-                rightDistr = distribution(right, mq);
-
-                if (joinInfo.keys().isEmpty()
-                    || (leftDistr.type() == HASH || leftDistr.type() == SINGLE)
-                    || (leftDistr.type() == BROADCAST && rightDistr.type() == BROADCAST))
-                    return leftDistr;
-
-                if (rightDistr == null)
-                    rightDistr = distribution(right, mq);
-
-                assert rightDistr.type() == HASH;
-
-                return IgniteDistributions.hash(joinInfo.leftKeys, rightDistr.destinationFunctionFactory());
-            case RIGHT:
-                rightDistr = distribution(right, mq);
-
-                if (leftDistr.type() == SINGLE
-                    || (leftDistr.type() == HASH && rightDistr.type() == HASH)
-                    || (leftDistr.type() == BROADCAST && rightDistr.type() == BROADCAST))
-                    return leftDistr;
-                assert rightDistr.type() == HASH;
-
-                return IgniteDistributions.hash(joinInfo.leftKeys, rightDistr.destinationFunctionFactory());
-            default:
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    public static DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) {
-        return RelMetadataQueryEx.wrap(mq).getDistributionTrait(rel);
+    public IgniteDistribution distribution(Values values, RelMetadataQuery mq) {
+        return IgniteDistributions.broadcast();
+    }
+
+    public IgniteDistribution distribution(Exchange exchange, RelMetadataQuery mq) {
+        return (IgniteDistribution) exchange.distribution;
+    }
+
+    public IgniteDistribution distribution(HepRelVertex rel, RelMetadataQuery mq) {
+        return _distribution(rel.getCurrentRel(), mq);
+    }
+
+    public static IgniteDistribution project(RelMetadataQuery mq, RelNode input, List<? extends RexNode> projects) {
+        return project(input.getRowType(), _distribution(input, mq), projects);
+    }
+
+    public static IgniteDistribution project(RelDataType inType, IgniteDistribution inDistr, List<? extends RexNode> projects) {
+        return inDistr.apply(Project.getPartialMapping(inType.getFieldCount(), projects));
+    }
+
+    public static IgniteDistribution filter(RelMetadataQuery mq, RelNode input, RexNode condition) {
+        return _distribution(input, mq);
+    }
+
+    public static IgniteDistribution join(RelMetadataQuery mq, RelNode left, RelNode right, JoinInfo joinInfo, JoinRelType joinType) {
+        return join(_distribution(left, mq), _distribution(right, mq), joinInfo, joinType);
+    }
+
+    public static IgniteDistribution join(IgniteDistribution left, IgniteDistribution right, JoinInfo joinInfo, JoinRelType joinType) {
+        return F.first(IgniteDistributions.suggestJoin(left, right, joinInfo, joinType)).out();
+    }
+
+    public static IgniteDistribution _distribution(RelNode rel, RelMetadataQuery mq) {
+        IgniteDistribution distr = rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+
+        if (distr.getType() != ANY)
+            return distr;
+
+        return (IgniteDistribution) mq.distribution(rel);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 503b424..c5f941e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
@@ -31,7 +32,9 @@ import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
 /**
@@ -58,7 +61,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
         return fragmentInfo(rel.getInput(), mq);
     }
 
-    public FragmentInfo getFragmentInfo(BiRel rel, RelMetadataQuery mq) {
+    public FragmentInfo getFragmentInfo(Join rel, RelMetadataQuery mq) {
         mq = RelMetadataQueryEx.wrap(mq);
 
         FragmentInfo left = fragmentInfo(rel.getLeft(), mq);
@@ -84,24 +87,24 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
         }
     }
 
-    private OptimisticPlanningException planningException(BiRel rel, Exception cause, boolean splitLeft) {
-        String msg = "Failed to calculate physical distribution";
-
-        if (splitLeft)
-            return new OptimisticPlanningException(msg, new Edge(rel, rel.getLeft(), 0), cause);
-
-        return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
-    }
-
     public FragmentInfo getFragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
         return new FragmentInfo(Pair.of(rel, rel.source()));
     }
 
     public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
-        return rel.fragmentInfo();
+        return rel.getTable().unwrap(IgniteTable.class).fragmentInfo(Commons.plannerContext(rel));
     }
 
     public static FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
         return RelMetadataQueryEx.wrap(mq).getFragmentLocation(rel);
     }
+
+    private OptimisticPlanningException planningException(BiRel rel, Exception cause, boolean splitLeft) {
+        String msg = "Failed to calculate physical distribution";
+
+        if (splitLeft)
+            return new OptimisticPlanningException(msg, new Edge(rel, rel.getLeft(), 0), cause);
+
+        return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 7fe6fbf..5579a14 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import com.google.common.collect.ImmutableList;
+import java.util.List;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
 import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
@@ -25,7 +26,7 @@ import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
 /**
@@ -35,23 +36,11 @@ public class IgniteMetadata {
     public static final RelMetadataProvider METADATA_PROVIDER =
         ChainedRelMetadataProvider.of(
             ImmutableList.of(
+                IgniteMdDerivedDistribution.SOURCE,
                 IgniteMdDistribution.SOURCE,
                 IgniteMdFragmentInfo.SOURCE,
                 DefaultRelMetadataProvider.INSTANCE));
 
-    public interface DistributionTraitMetadata extends Metadata {
-        MetadataDef<DistributionTraitMetadata> DEF = MetadataDef.of(DistributionTraitMetadata.class,
-            DistributionTraitMetadata.Handler.class, IgniteMethod.DISTRIBUTION_TRAIT.method());
-
-        /** Determines how the rows are distributed. */
-        DistributionTrait getDistributionTrait();
-
-        /** Handler API. */
-        interface Handler extends MetadataHandler<DistributionTraitMetadata> {
-            DistributionTrait getDistributionTrait(RelNode r, RelMetadataQuery mq);
-        }
-    }
-
     public interface FragmentMetadata extends Metadata {
         MetadataDef<FragmentMetadata> DEF = MetadataDef.of(FragmentMetadata.class,
             FragmentMetadata.Handler.class, IgniteMethod.FRAGMENT_INFO.method());
@@ -64,4 +53,16 @@ public class IgniteMetadata {
             FragmentInfo getFragmentInfo(RelNode r, RelMetadataQuery mq);
         }
     }
+
+    public interface DerivedDistribution extends Metadata {
+        MetadataDef<DerivedDistribution> DEF = MetadataDef.of(DerivedDistribution.class,
+            DerivedDistribution.Handler.class, IgniteMethod.DERIVED_DISTRIBUTIONS.method());
+
+        List<IgniteDistribution> deriveDistributions();
+
+        /** Handler API. */
+        interface Handler extends MetadataHandler<DerivedDistribution> {
+            List<IgniteDistribution> deriveDistributions(RelNode r, RelMetadataQuery mq);
+        }
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index fb609ea..f0d3fe3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -16,7 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
 
 /**
  *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index c93d04b..74d7682 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -16,7 +16,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.util.Arrays;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -27,7 +28,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -35,22 +36,21 @@ import org.jetbrains.annotations.NotNull;
  */
 public class RelMetadataQueryEx extends RelMetadataQuery {
     private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx();
-    private static final JaninoRelMetadataProvider PROVIDER = JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
+    public static final JaninoRelMetadataProvider PROVIDER = JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
 
     static {
-        PROVIDER.register(Arrays.asList(
-            IgniteExchange.class,
-            IgniteFilter.class,
-            IgniteJoin.class,
-            IgniteProject.class,
-            IgniteTableScan.class,
-            IgniteReceiver.class,
-            IgniteSender.class
-        ));
+        PROVIDER.register(ImmutableList.of(
+                IgniteExchange.class,
+                IgniteReceiver.class,
+                IgniteSender.class,
+                IgniteFilter.class,
+                IgniteProject.class,
+                IgniteJoin.class,
+                IgniteTableScan.class));
     }
 
-    private IgniteMetadata.DistributionTraitMetadata.Handler distributionTraitHandler;
     private IgniteMetadata.FragmentMetadata.Handler sourceDistributionHandler;
+    private IgniteMetadata.DerivedDistribution.Handler derivedDistributionsHandler;
 
     @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
     public static RelMetadataQueryEx instance() {
@@ -67,22 +67,22 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
     private RelMetadataQueryEx(@NotNull RelMetadataQueryEx parent) {
         super(PROVIDER, parent);
 
-        distributionTraitHandler = parent.distributionTraitHandler;
         sourceDistributionHandler = parent.sourceDistributionHandler;
+        derivedDistributionsHandler = parent.derivedDistributionsHandler;
     }
 
     private RelMetadataQueryEx(@NotNull RelMetadataQuery parent) {
         super(PROVIDER, parent);
 
-        distributionTraitHandler = PROTO.distributionTraitHandler;
         sourceDistributionHandler = PROTO.sourceDistributionHandler;
+        derivedDistributionsHandler = PROTO.derivedDistributionsHandler;
     }
 
     private RelMetadataQueryEx() {
         super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
 
-        distributionTraitHandler = initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
         sourceDistributionHandler = initialHandler(IgniteMetadata.FragmentMetadata.Handler.class);
+        derivedDistributionsHandler = initialHandler(IgniteMetadata.DerivedDistribution.Handler.class);
     }
 
     public FragmentInfo getFragmentLocation(RelNode rel) {
@@ -95,12 +95,12 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
         }
     }
 
-    public DistributionTrait getDistributionTrait(RelNode rel) {
+    public List<IgniteDistribution> derivedDistributions(RelNode rel) {
         for (;;) {
             try {
-                return distributionTraitHandler.getDistributionTrait(rel, this);
+                return derivedDistributionsHandler.deriveDistributions(rel, this);
             } catch (JaninoRelMetadataProvider.NoHandler e) {
-                distributionTraitHandler = revise(e.relClass, IgniteMetadata.DistributionTraitMetadata.DEF);
+                derivedDistributionsHandler = revise(e.relClass, IgniteMetadata.DerivedDistribution.DEF);
             }
         }
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
index 192e686..b81961f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
@@ -35,9 +35,7 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
 
 /**
@@ -84,11 +82,11 @@ public class DistributedExecution implements QueryExecution {
 
             RelTraitSet desired = rel.getTraitSet()
                 .replace(relRoot.collation)
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(RelDistributions.ANY)
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         } catch (SqlParseException | ValidationException e) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index dec15d4..5c12ffa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -38,6 +38,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.plan.volcano.VolcanoUtils;
 import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
@@ -46,7 +47,9 @@ import org.apache.calcite.rel.core.TableFunctionScan;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
@@ -70,9 +73,8 @@ import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.GraphToRelConverter;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
@@ -159,12 +161,14 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         metadataProvider = null;
         validator = null;
 
+        RelMetadataQuery.THREAD_PROVIDERS.remove();
+
         open = false;
     }
 
     private void ready() {
         if (!open) {
-            planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), context);
+            planner = VolcanoUtils.impatient(new VolcanoPlanner(frameworkConfig.getCostFactory(), context));
             planner.setExecutor(executor);
             metadataProvider = new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner);
 
@@ -213,7 +217,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
     public RelNode convert(RelGraph graph) {
         ready();
 
-        RelOptCluster cluster = RelOptCluster.create(planner, createRexBuilder());
+        RelOptCluster cluster = createCluster(createRexBuilder());
         RelBuilder relBuilder = createRelBuilder(cluster, createCatalogReader());
 
         return new GraphToRelConverter(this, relBuilder, operatorTable).convert(graph);
@@ -224,7 +228,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         ready();
 
         RexBuilder rexBuilder = createRexBuilder();
-        RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+        RelOptCluster cluster = createCluster(rexBuilder);
         SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
             .withConfig(sqlToRelConverterConfig)
             .withTrimUnusedFields(false)
@@ -242,8 +246,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
     public QueryPlan plan(RelNode rel) {
         ready();
 
-        if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
-            throw new IllegalArgumentException("IGNITE_CONVENTION is required.");
+        if (rel.getConvention() != IgniteConvention.INSTANCE)
+            throw new IllegalArgumentException("Physical node is required.");
 
         return new Splitter().go((IgniteRel) rel);
     }
@@ -251,10 +255,10 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
     public Graph graph(RelNode rel) {
         ready();
 
-        if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
-            throw new IllegalArgumentException("IGNITE_CONVENTION is required.");
+        if (rel.getConvention() != IgniteConvention.INSTANCE)
+            throw new IllegalArgumentException("Physical node is required.");
 
-        return new RelToGraphConverter().convert((IgniteRel) rel);
+        return new RelToGraphConverter().go((IgniteRel) rel);
     }
 
     /** {@inheritDoc} */
@@ -277,7 +281,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         validator.setIdentifierExpansion(true);
 
         RexBuilder rexBuilder = createRexBuilder();
-        RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+        RelOptCluster cluster = createCluster(rexBuilder);
         SqlToRelConverter.Config config = SqlToRelConverter
             .configBuilder()
             .withConfig(sqlToRelConverterConfig)
@@ -294,6 +298,15 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
     }
 
+    private RelOptCluster createCluster(RexBuilder rexBuilder) {
+        RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+
+        cluster.setMetadataProvider(metadataProvider);
+        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(metadataProvider));
+
+        return cluster;
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode transform(int programIdx, RelTraitSet targetTraits, RelNode rel) {
         ready();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
index dc20880..352980f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
@@ -27,9 +27,6 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
 import org.apache.ignite.internal.processors.query.calcite.exchange.ExchangeProcessor;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 
 /**
  *
@@ -43,14 +40,13 @@ public final class PlannerContext implements Context {
     private final GridKernalContext kernalContext;
     private final CalciteQueryProcessor queryProcessor;
     private final MappingService mappingService;
-    private final TableDistributionService distributionService;
     private final ExchangeProcessor exchangeProcessor;
 
     private IgnitePlanner planner;
 
     private PlannerContext(Context parentContext, Query query, AffinityTopologyVersion topologyVersion,
         SchemaPlus schema, IgniteLogger logger, GridKernalContext kernalContext, CalciteQueryProcessor queryProcessor, MappingService mappingService,
-        TableDistributionService distributionService, ExchangeProcessor exchangeProcessor) {
+        ExchangeProcessor exchangeProcessor) {
         this.parentContext = parentContext;
         this.query = query;
         this.topologyVersion = topologyVersion;
@@ -59,7 +55,6 @@ public final class PlannerContext implements Context {
         this.kernalContext = kernalContext;
         this.queryProcessor = queryProcessor;
         this.mappingService = mappingService;
-        this.distributionService = distributionService;
         this.exchangeProcessor = exchangeProcessor;
     }
 
@@ -99,10 +94,6 @@ public final class PlannerContext implements Context {
         return mappingService;
     }
 
-    public TableDistributionService distributionService() {
-        return distributionService;
-    }
-
     public ExchangeProcessor exchangeProcessor() {
         return exchangeProcessor;
     }
@@ -125,10 +116,6 @@ public final class PlannerContext implements Context {
         return mappingService.distributed(cacheId, topVer);
     }
 
-    public DistributionTrait distributionTrait(int cacheId, RowType rowType) {
-        return distributionService.distribution(cacheId, rowType);
-    }
-
     public QueryProvider queryProvider() {
         return null; // TODO
     }
@@ -153,7 +140,6 @@ public final class PlannerContext implements Context {
         private GridKernalContext kernalContext;
         private CalciteQueryProcessor queryProcessor;
         private MappingService mappingService;
-        private TableDistributionService distributionService;
         private ExchangeProcessor exchangeProcessor;
 
         public Builder parentContext(Context parentContext) {
@@ -196,18 +182,13 @@ public final class PlannerContext implements Context {
             return this;
         }
 
-        public Builder distributionService(TableDistributionService distributionService) {
-            this.distributionService = distributionService;
-            return this;
-        }
-
         public Builder exchangeProcessor(ExchangeProcessor exchangeProcessor) {
             this.exchangeProcessor = exchangeProcessor;
             return this;
         }
 
         public PlannerContext build() {
-            return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, distributionService, exchangeProcessor);
+            return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, exchangeProcessor);
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
similarity index 61%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index a243f52..906c20e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rule;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import org.apache.calcite.rel.rules.SubQueryRemoveRule;
 import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
-import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverter;
+import org.apache.ignite.internal.processors.query.calcite.rule.JoinConverter;
+import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverter;
+import org.apache.ignite.internal.processors.query.calcite.rule.TableScanConverter;
 
 /**
  *
@@ -29,21 +32,21 @@ public enum PlannerPhase {
     /** */
     SUBQUERY_REWRITE("Sub-queries rewrites") {
         @Override public RuleSet getRules(PlannerContext ctx) {
-            return RuleSets.ofList(IgniteRules.SUBQUERY_REWRITE_RULES);
+            return RuleSets.ofList(
+                SubQueryRemoveRule.FILTER,
+                SubQueryRemoveRule.PROJECT,
+                SubQueryRemoveRule.JOIN);
         }
     },
 
     /** */
-    LOGICAL("Logical planning") {
+    OPTIMIZATION("Main optimization phase") {
         @Override public RuleSet getRules(PlannerContext ctx) {
-            return RuleSets.ofList(IgniteRules.logicalRules(ctx));
-        }
-    },
-
-    /** */
-    PHYSICAL("Execution tree building") {
-        @Override public RuleSet getRules(PlannerContext ctx) {
-            return RuleSets.ofList(Interpretable.RULES);
+            return RuleSets.ofList(
+                new TableScanConverter(),
+                new JoinConverter(),
+                new ProjectConverter(),
+                new FilterConverter());
         }
     };
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
similarity index 92%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
index a4a8db8..cddc4d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerType.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.rule;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 /**
  *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
new file mode 100644
index 0000000..c4fad9f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.AbstractConverter;
+
+/**
+ *
+ */
+public class IgniteConvention extends Convention.Impl {
+    public static final Convention INSTANCE = new IgniteConvention();
+
+    private IgniteConvention() {
+        super("IGNITE", IgniteRel.class);
+    }
+
+    @Override public void register(RelOptPlanner planner) {
+        planner.addRule(AbstractConverter.ExpandConversionRule.INSTANCE);
+    }
+
+    @Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+        return fromTraits.contains(INSTANCE) && toTraits.contains(INSTANCE);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 51be890..f932020 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -16,52 +16,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.calcite.rel.core.Exchange;
 
 /**
  *
  */
-public final class IgniteExchange extends SingleRel implements IgniteRel {
-    /**
-     * Creates a <code>SingleRel</code>.
-     *
-     * @param cluster Cluster this relational expression belongs to
-     * @param traits Node traits.
-     * @param input   Input relational expression
-     */
-    public IgniteExchange(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
-        super(cluster, traits, input);
+public class IgniteExchange extends Exchange implements IgniteRel {
+    public IgniteExchange(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDistribution distribution) {
+        super(cluster, traitSet, input, distribution);
     }
 
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rowCount = mq.getRowCount(this);
-        double bytesPerRow = getRowType().getFieldCount() * 4;
-        return planner.getCostFactory().makeCost(
-            Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
+    @Override public Exchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution) {
+        return new IgniteExchange(getCluster(), traitSet, newInput, newDistribution);
     }
 
-    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new IgniteExchange(getCluster(), traitSet, sole(inputs));
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T implement(RelImplementor<T> implementor) {
+    @Override public <T> T implement(Implementor<T> implementor) {
         return implementor.implement(this);
     }
-
-    @Override public RelWriter explainTerms(RelWriter pw) {
-        return super.explainTerms(pw)
-            .item("distribution", getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 5a96897..2a0594e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,66 +13,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import com.google.common.collect.ImmutableSet;
-import java.util.Objects;
-import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteFilter extends Filter implements IgniteRel {
-  private final Set<CorrelationId> variablesSet;
-
-  public IgniteFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
-      RexNode condition, Set<CorrelationId> variablesSet) {
-    super(cluster, traitSet, child, condition);
-    this.variablesSet = Objects.requireNonNull(variablesSet);
-  }
-
-    @Override public Set<CorrelationId> getVariablesSet() {
-    return variablesSet;
-  }
-
-  @Override public IgniteFilter copy(RelTraitSet traitSet, RelNode input,
-      RexNode condition) {
-    return new IgniteFilter(getCluster(), traitSet, input, condition, variablesSet);
-  }
-
-  /** {@inheritDoc} */
-  @Override public <T> T implement(RelImplementor<T> implementor) {
-    return implementor.implement(this);
-  }
 
-  @Override public RelWriter explainTerms(RelWriter pw) {
-    return super.explainTerms(pw)
-        .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
-  }
-
-  public static IgniteFilter create(Filter filter, RelNode input) {
-    RexNode condition = filter.getCondition();
-    Set<CorrelationId> variablesSet = filter.getVariablesSet();
-
-    return create(input, condition, variablesSet);
-  }
-
-  public static IgniteFilter create(RelNode input, RexNode condition, Set<CorrelationId> variablesSet) {
-    RelOptCluster cluster = input.getCluster();
-    RelMetadataQuery mq = cluster.getMetadataQuery();
-
-    RelTraitSet traits = cluster.traitSet()
-        .replace(IgniteRel.IGNITE_CONVENTION)
-        .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.filter(mq, input, condition));
-
-    return new IgniteFilter(cluster, traits, input, condition, ImmutableSet.copyOf(variablesSet));
-  }
-}
\ No newline at end of file
+/**
+ *
+ */
+public class IgniteFilter extends Filter implements IgniteRel {
+    public IgniteFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) {
+        super(cluster, traits, input, condition);
+    }
+
+    @Override public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+        return new IgniteFilter(getCluster(), traitSet, input, condition);
+    }
+
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
index e576897..344f4a2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,53 +13,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteJoin extends Join implements IgniteRel {
-  private final boolean semiJoinDone;
-
-  public IgniteJoin(
-      RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode left,
-      RelNode right,
-      RexNode condition,
-      Set<CorrelationId> variablesSet,
-      JoinRelType joinType,
-      boolean semiJoinDone) {
-    super(cluster, traitSet, left, right, condition, variablesSet, joinType);
-    this.semiJoinDone = semiJoinDone;
-  }
 
-  @Override public IgniteJoin copy(RelTraitSet traitSet, RexNode conditionExpr,
-      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
-    return new IgniteJoin(getCluster(), traitSet, left, right, conditionExpr, variablesSet, joinType, semiJoinDone);
-  }
-
-  /** {@inheritDoc} */
-  @Override public <T> T implement(RelImplementor<T> implementor) {
-    return implementor.implement(this);
-  }
+/**
+ *
+ */
+public class IgniteJoin extends Join implements IgniteRel {
+    public IgniteJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+    }
 
-  @Override public RelWriter explainTerms(RelWriter pw) {
-    // Don't ever print semiJoinDone=false. This way, we
-    // don't clutter things up in optimizers that don't use semi-joins.
-    return super.explainTerms(pw)
-        .itemIf("semiJoinDone", semiJoinDone, semiJoinDone);
-  }
+    @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+        return new IgniteJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
+    }
 
-  @Override public boolean isSemiJoinDone() {
-    return semiJoinDone;
-  }
-}
\ No newline at end of file
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index fe0d55e..03fe6ca 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
@@ -21,55 +21,22 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteProject extends Project implements IgniteRel {
-  public IgniteProject(
-      RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode input,
-      List<? extends RexNode> projects,
-      RelDataType rowType) {
-    super(cluster, traitSet, input, projects, rowType);
-  }
-
-  @Override public IgniteProject copy(RelTraitSet traitSet, RelNode input,
-      List<RexNode> projects, RelDataType rowType) {
-    return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
-  }
-
-  /** {@inheritDoc} */
-  @Override public <T> T implement(RelImplementor<T> implementor) {
-    return implementor.implement(this);
-  }
 
-  /** Creates a LogicalProject. */
-  public static IgniteProject create(final RelNode input, List<? extends RexNode> projects, List<String> fieldNames) {
-    final RelOptCluster cluster = input.getCluster();
-    final RelDataType rowType =
-        RexUtil.createStructType(cluster.getTypeFactory(), projects,
-            fieldNames, SqlValidatorUtil.F_SUGGESTER);
-    return create(input, projects, rowType);
-  }
-
-  public static IgniteProject create(Project project, RelNode input) {
-    return create(input, project.getProjects(), project.getRowType());
-  }
-
-  public static IgniteProject create(RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    RelOptCluster cluster = input.getCluster();
-    RelMetadataQuery mq = cluster.getMetadataQuery();
-    RelTraitSet traits = cluster.traitSet()
-        .replace(IgniteRel.IGNITE_CONVENTION)
-        .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.project(mq, input, projects));
-
-    return new IgniteProject(cluster, traits, input, projects, rowType);
-  }
+/**
+ *
+ */
+public class IgniteProject extends Project implements IgniteRel {
+    public IgniteProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+        super(cluster, traits, input, projects, rowType);
+    }
+
+    @Override public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+        return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
+    }
+
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 98cf908..60d5477 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -16,41 +16,46 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  *
  */
-public final class IgniteReceiver extends AbstractRelNode implements IgniteRel {
-    private final Source source;
-
-    /**
-     * @param cluster Cluster this relational expression belongs to
-     * @param traits Trait set.
-     */
-    public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Source source) {
+public class IgniteReceiver extends AbstractRelNode implements IgniteRel {
+    private RelSource source;
+
+    public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, RelSource source) {
         super(cluster, traits);
+
         this.rowType = rowType;
         this.source = source;
     }
 
-    /** {@inheritDoc} */
-    @Override public <T> T implement(RelImplementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteReceiver(getCluster(), traitSet, rowType, source);
     }
 
-    public DistributionTrait distribution() {
-        return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
     }
 
-    public Source source() {
+    public RelSource source() {
         return source;
     }
+
+    public void source(RelSource source) {
+        this.source = source;
+    }
+
+    public IgniteDistribution distribution() {
+        return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index dbfbb3f..b5d876f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,22 +16,11 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 /**
  *
  */
 public interface IgniteRel extends RelNode {
-    Convention IGNITE_CONVENTION = new Convention.Impl("IGNITE_LOGICAL", IgniteRel.class) {
-        /** */
-        @Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
-            return fromTraits.containsIfApplicable(IGNITE_CONVENTION)
-                && toTraits.containsIfApplicable(IGNITE_CONVENTION); // Enables trait definition conversion
-        }
-    };
-
-    <T> T implement(RelImplementor<T> implementor);
+    <T> T implement(Implementor<T> implementor);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
similarity index 78%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
index 9734885..7d3efc9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
@@ -14,17 +14,10 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 
 /**
  *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index e89712a..e50b7bf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -21,59 +21,37 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
 
 /**
  *
  */
-public final class IgniteSender extends SingleRel implements IgniteRel {
-    private Target target;
+public class IgniteSender extends SingleRel implements IgniteRel {
+    private RelTarget target;
 
-    /**
-     * Creates a <code>SingleRel</code>.
-     * @param cluster Cluster this relational expression belongs to
-     * @param traits Trait set.
-     * @param input Input relational expression
-     */
-    public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelTarget target) {
         super(cluster, traits, input);
+
+        this.target = target;
     }
 
-    private IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input, Target target) {
+    public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
         super(cluster, traits, input);
-
-        this.target = target;
     }
 
     @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
         return new IgniteSender(getCluster(), traitSet, sole(inputs), target);
     }
 
-    /** {@inheritDoc} */
-    @Override public <T> T implement(RelImplementor<T> implementor) {
+    @Override public <T> T implement(Implementor<T> implementor) {
         return implementor.implement(this);
     }
 
-    public void init(Target target) {
-        this.target = target;
-    }
-
-    public Target target() {
+    public RelTarget target() {
         return target;
     }
 
-    public static IgniteSender create(RelNode input, Target target) {
-        RelOptCluster cluster = input.getCluster();
-        RelMetadataQuery mq = cluster.getMetadataQuery();
-
-        RelTraitSet traits = cluster.traitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION)
-            .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.distribution(input, mq));
-
-        return new IgniteSender(cluster, traits, input, target);
+    public void target(RelTarget target) {
+        this.target = target;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index c529aee..5f01899 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
@@ -22,29 +22,20 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-
-public final class IgniteTableScan extends TableScan implements IgniteRel {
-  public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
-    super(cluster, traitSet, table);
-  }
 
-  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    assert inputs.isEmpty();
-
-    return this;
-  }
+/**
+ *
+ */
+public class IgniteTableScan extends TableScan implements IgniteRel {
+    public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+        super(cluster, traitSet, table);
+    }
 
-  /** {@inheritDoc} */
-  @Override public <T> T implement(RelImplementor<T> implementor) {
-    return implementor.implement(this);
-  }
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return this;
+    }
 
-  public FragmentInfo fragmentInfo() {
-    return getTable().unwrap(IgniteTable.class)
-        .fragmentInfo(Commons.plannerContext(getCluster().getPlanner().getContext()));
-  }
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
similarity index 58%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
index c67964f..50da38c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionType.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
@@ -14,32 +14,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.trait;
+package org.apache.ignite.internal.processors.query.calcite.rel;
 
 /**
  *
  */
-public enum DistributionType {
-    HASH("hash"),
-    RANDOM("random"),
-    BROADCAST("broadcast"),
-    SINGLE("single"),
-    ANY("any");
-
-    /** */
-    private final String description;
-
-    /**
-     *
-     */
-    DistributionType(String description) {
-        this.description = description;
-    }
+public interface Implementor<T> extends RelOp<IgniteRel, T> {
+    T implement(IgniteSender rel);
+
+    T implement(IgniteFilter rel);
+
+    T implement(IgniteProject rel);
+
+    T implement(IgniteJoin rel);
+
+    T implement(IgniteTableScan rel);
+
+    T implement(IgniteReceiver rel);
+
+    T implement(IgniteExchange rel);
+
+    T implement(IgniteRel other);
 
-    /**
-     *
-     */
-    @Override public String toString() {
-        return description;
+    @Override default T go(IgniteRel rel) {
+        return rel.implement(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelOp.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/RelOp.java
similarity index 93%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelOp.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/RelOp.java
index 6bc90bf..fb073c9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelOp.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/RelOp.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import org.apache.calcite.rel.RelNode;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractVariableConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractVariableConverter.java
new file mode 100644
index 0000000..02c7d53
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractVariableConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public abstract class AbstractVariableConverter extends ConverterRule {
+    protected AbstractVariableConverter(Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix) {
+        super(clazz, in, out, descriptionPrefix);
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+        RelNode rel = call.rel(0);
+        if (rel.getTraitSet().contains(getInTrait())) {
+            for (RelNode newRel : convert(rel, false))
+                call.transformTo(newRel);
+        }
+    }
+
+    @Override public RelNode convert(RelNode rel) {
+        return F.first(convert(rel, true));
+    }
+
+    public abstract List<RelNode> convert(RelNode rel, boolean firstOnly);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterConverter.java
new file mode 100644
index 0000000..6a6ce47
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterConverter.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class FilterConverter extends AbstractVariableConverter {
+    public FilterConverter() {
+        super(LogicalFilter.class, Convention.NONE, IgniteConvention.INSTANCE, "FilterConverter");
+    }
+
+    @Override public List<RelNode> convert(RelNode rel, boolean firstOnly) {
+        LogicalFilter filter = (LogicalFilter) rel;
+
+        RelNode input = convert(filter.getInput(), IgniteConvention.INSTANCE);
+
+        RelOptCluster cluster = rel.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        List<IgniteDistribution> distrs = IgniteMdDerivedDistribution.deriveDistributions(input, IgniteConvention.INSTANCE, mq);
+
+        return firstOnly ? F.asList(create(filter, input, F.first(distrs))) :
+            Commons.transform(distrs, d -> create(filter, input, d));
+    }
+
+    private static IgniteFilter create(LogicalFilter filter, RelNode input, IgniteDistribution distr) {
+        RelTraitSet traits = filter.getTraitSet()
+            .replace(distr)
+            .replace(IgniteConvention.INSTANCE);
+
+        return new IgniteFilter(filter.getCluster(), traits, convert(input, distr), filter.getCondition());
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
deleted file mode 100644
index 4d82c98..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelOp;
-
-/**
- *
- */
-public class IgniteFilterRule extends RelOptRule {
-    public static final RelOptRule INSTANCE = new IgniteFilterRule();
-
-    private IgniteFilterRule() {
-        super(Commons.any(LogicalFilter.class, RelNode.class), "IgniteFilterRule");
-    }
-
-    @Override public void onMatch(RelOptRuleCall call) {
-        LogicalFilter filter = call.rel(0);
-        RelOptCluster cluster = filter.getCluster();
-        RelNode input = filter.getInput();
-
-        final RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION);
-
-        RelNode converted = convert(input, traitSet);
-
-        RelOp<LogicalFilter, Boolean> transformOp = Commons.transformSubset(call, converted, IgniteFilter::create);
-
-        if (!transformOp.go(filter))
-            call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(converted)));
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
deleted file mode 100644
index 3f380d3..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Objects;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE;
-
-/**
- *
- */
-public class IgniteJoinRule extends RelOptRule {
-    public static final RelOptRule INSTANCE = new IgniteJoinRule();
-
-    public IgniteJoinRule() {
-        super(Commons.any(Join.class, RelNode.class), RelFactories.LOGICAL_BUILDER, "IgniteJoinRule");
-    }
-
-    @Override public void onMatch(RelOptRuleCall call) {
-        Join join = call.rel(0);
-
-        RelOptCluster cluster = join.getCluster();
-
-        RelTraitSet traitSet = cluster.traitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION);
-
-        RelNode left = convert(join.getLeft(), traitSet);
-        RelNode right = convert(join.getRight(), traitSet);
-
-        RelMetadataQuery mq = call.getMetadataQuery();
-
-        List<DistributionTrait> leftDerived;
-        List<DistributionTrait> rightDerived;
-
-        if ((leftDerived = IgniteDistributions.deriveDistributions(left, mq)).isEmpty()
-            || (rightDerived = IgniteDistributions.deriveDistributions(right, mq)).isEmpty()) {
-            call.transformTo(join.copy(join.getTraitSet(), ImmutableList.of(left, right)));
-
-            return;
-        }
-
-        List<DistributionTrait> leftDists = Commons.concat(leftDerived,
-            IgniteDistributions.hash(join.analyzeCondition().leftKeys));
-
-        List<DistributionTrait> rightDists = Commons.concat(rightDerived,
-            IgniteDistributions.hash(join.analyzeCondition().rightKeys));
-
-        for (DistributionTrait leftDist0 : leftDists) {
-            for (DistributionTrait rightDist0 : rightDists) {
-                if (canTransform(join, leftDist0, rightDist0))
-                    transform(call, join, mq, leftDist0, rightDist0);
-            }
-        }
-    }
-
-    private void transform(RelOptRuleCall call, Join join, RelMetadataQuery mq, DistributionTrait leftDist, DistributionTrait rightDist) {
-        RelOptCluster cluster = join.getCluster();
-
-        RelTraitSet leftTraits = cluster.traitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION)
-            .replace(leftDist);
-
-        RelTraitSet rightTraits = cluster.traitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION)
-            .replace(rightDist);
-
-        RelNode left = convert(join.getLeft(), leftTraits);
-        RelNode right = convert(join.getRight(), rightTraits);
-
-        RelTraitSet traitSet = cluster.traitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION)
-            .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.join(mq, left, right, join.analyzeCondition(), join.getJoinType()));
-
-        call.transformTo(new IgniteJoin(cluster, traitSet, left, right,
-            join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone()));
-    }
-
-    private boolean canTransform(Join join, DistributionTrait leftDist, DistributionTrait rightDist) {
-        if (leftDist.type() == BROADCAST
-            && rightDist.type() == BROADCAST)
-            return true;
-
-        if (rightDist.type() == SINGLE
-            && leftDist.type() == SINGLE)
-            return true;
-
-        if (leftDist.type() == BROADCAST
-            && rightDist.type() == HASH
-            && Objects.equals(rightDist.keys(), join.analyzeCondition().rightKeys))
-            return true;
-
-        if (rightDist.type() == BROADCAST
-            && leftDist.type() == HASH
-            && Objects.equals(leftDist.keys(), join.analyzeCondition().leftKeys))
-            return true;
-
-        if (leftDist.type() == HASH
-            && rightDist.type() == HASH
-            && Objects.equals(leftDist.keys(), join.analyzeCondition().leftKeys)
-            && Objects.equals(rightDist.keys(), join.analyzeCondition().rightKeys)
-            && Objects.equals(rightDist.destinationFunctionFactory(), leftDist.destinationFunctionFactory()))
-            return true;
-
-        return false;
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
deleted file mode 100644
index f238d08..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelOp;
-
-/**
- *
- */
-public class IgniteProjectRule extends RelOptRule {
-    public static final RelOptRule INSTANCE = new IgniteProjectRule();
-
-    private <R extends RelNode> IgniteProjectRule() {
-        super(Commons.any(LogicalProject.class, RelNode.class), RelFactories.LOGICAL_BUILDER, "IgniteProjectRule");
-    }
-
-    @Override public void onMatch(RelOptRuleCall call) {
-        LogicalProject project = call.rel(0);
-        RelOptCluster cluster = project.getCluster();
-        RelNode input = project.getInput();
-
-        final RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION);
-
-        RelNode converted = convert(input, traitSet);
-
-        RelOp<LogicalProject, Boolean> transformOp = Commons.transformSubset(call, converted, IgniteProject::create);
-
-        if (!transformOp.go(project))
-            call.transformTo(project.copy(project.getTraitSet(), ImmutableList.of(converted)));
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
deleted file mode 100644
index cf514d5..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.volcano.AbstractConverter;
-import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
-import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
-import org.apache.calcite.rel.rules.AggregateMergeRule;
-import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
-import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
-import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
-import org.apache.calcite.rel.rules.AggregateRemoveRule;
-import org.apache.calcite.rel.rules.AggregateStarTableRule;
-import org.apache.calcite.rel.rules.AggregateValuesRule;
-import org.apache.calcite.rel.rules.CalcRemoveRule;
-import org.apache.calcite.rel.rules.DateRangeRules;
-import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
-import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
-import org.apache.calcite.rel.rules.FilterJoinRule;
-import org.apache.calcite.rel.rules.FilterMergeRule;
-import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
-import org.apache.calcite.rel.rules.FilterTableScanRule;
-import org.apache.calcite.rel.rules.IntersectToDistinctRule;
-import org.apache.calcite.rel.rules.JoinCommuteRule;
-import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
-import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
-import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
-import org.apache.calcite.rel.rules.ProjectMergeRule;
-import org.apache.calcite.rel.rules.ProjectRemoveRule;
-import org.apache.calcite.rel.rules.ProjectToWindowRule;
-import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
-import org.apache.calcite.rel.rules.ReduceExpressionsRule;
-import org.apache.calcite.rel.rules.SemiJoinRule;
-import org.apache.calcite.rel.rules.SortJoinTransposeRule;
-import org.apache.calcite.rel.rules.SortProjectTransposeRule;
-import org.apache.calcite.rel.rules.SortRemoveConstantKeysRule;
-import org.apache.calcite.rel.rules.SortRemoveRule;
-import org.apache.calcite.rel.rules.SortUnionTransposeRule;
-import org.apache.calcite.rel.rules.SubQueryRemoveRule;
-import org.apache.calcite.rel.rules.TableScanRule;
-import org.apache.calcite.rel.rules.UnionMergeRule;
-import org.apache.calcite.rel.rules.UnionPullUpConstantsRule;
-import org.apache.calcite.rel.rules.UnionToDistinctRule;
-import org.apache.calcite.rel.rules.ValuesReduceRule;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-
-/**
- *
- */
-public class IgniteRules {
-    public static final List<RelOptRule> BASE_RULES = ImmutableList.of(
-        AggregateStarTableRule.INSTANCE,
-        AggregateStarTableRule.INSTANCE2,
-        TableScanRule.INSTANCE,
-        ProjectMergeRule.INSTANCE,
-        FilterTableScanRule.INSTANCE,
-        ProjectFilterTransposeRule.INSTANCE,
-        FilterProjectTransposeRule.INSTANCE,
-        FilterJoinRule.FILTER_ON_JOIN,
-        JoinPushExpressionsRule.INSTANCE,
-        AggregateExpandDistinctAggregatesRule.INSTANCE,
-        AggregateReduceFunctionsRule.INSTANCE,
-        FilterAggregateTransposeRule.INSTANCE,
-        ProjectWindowTransposeRule.INSTANCE,
-        JoinCommuteRule.INSTANCE,
-        JoinPushThroughJoinRule.RIGHT,
-        JoinPushThroughJoinRule.LEFT,
-        SortProjectTransposeRule.INSTANCE,
-        SortJoinTransposeRule.INSTANCE,
-        SortRemoveConstantKeysRule.INSTANCE,
-        SortUnionTransposeRule.INSTANCE,
-        ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE,
-        ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE);
-
-    public static final List<RelOptRule> ABSTRACT_RULES = ImmutableList.of(
-        AggregateProjectPullUpConstantsRule.INSTANCE2,
-        UnionPullUpConstantsRule.INSTANCE,
-        PruneEmptyRules.UNION_INSTANCE,
-        PruneEmptyRules.INTERSECT_INSTANCE,
-        PruneEmptyRules.MINUS_INSTANCE,
-        PruneEmptyRules.PROJECT_INSTANCE,
-        PruneEmptyRules.FILTER_INSTANCE,
-        PruneEmptyRules.SORT_INSTANCE,
-        PruneEmptyRules.AGGREGATE_INSTANCE,
-        PruneEmptyRules.JOIN_LEFT_INSTANCE,
-        PruneEmptyRules.JOIN_RIGHT_INSTANCE,
-        PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
-        UnionMergeRule.INSTANCE,
-        UnionMergeRule.INTERSECT_INSTANCE,
-        UnionMergeRule.MINUS_INSTANCE,
-        ProjectToWindowRule.PROJECT,
-        FilterMergeRule.INSTANCE,
-        DateRangeRules.FILTER_INSTANCE,
-        IntersectToDistinctRule.INSTANCE);
-
-    public static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES = ImmutableList.of(
-        FilterJoinRule.FILTER_ON_JOIN,
-        FilterJoinRule.JOIN,
-        AbstractConverter.ExpandConversionRule.INSTANCE,
-        JoinCommuteRule.INSTANCE,
-        SemiJoinRule.PROJECT,
-        SemiJoinRule.JOIN,
-        AggregateRemoveRule.INSTANCE,
-        UnionToDistinctRule.INSTANCE,
-        ProjectRemoveRule.INSTANCE,
-        AggregateJoinTransposeRule.INSTANCE,
-        AggregateMergeRule.INSTANCE,
-        AggregateProjectMergeRule.INSTANCE,
-        CalcRemoveRule.INSTANCE,
-        SortRemoveRule.INSTANCE);
-
-    public static final List<RelOptRule> CONSTANT_REDUCTION_RULES = ImmutableList.of(
-        ReduceExpressionsRule.PROJECT_INSTANCE,
-        ReduceExpressionsRule.FILTER_INSTANCE,
-        ReduceExpressionsRule.CALC_INSTANCE,
-        ReduceExpressionsRule.WINDOW_INSTANCE,
-        ReduceExpressionsRule.JOIN_INSTANCE,
-        ValuesReduceRule.FILTER_INSTANCE,
-        ValuesReduceRule.PROJECT_FILTER_INSTANCE,
-        ValuesReduceRule.PROJECT_INSTANCE,
-        AggregateValuesRule.INSTANCE);
-
-    public static final List<RelOptRule> SUBQUERY_REWRITE_RULES = ImmutableList.of(
-        SubQueryRemoveRule.FILTER,
-        SubQueryRemoveRule.PROJECT,
-        SubQueryRemoveRule.JOIN);
-
-    public static final List<RelOptRule> IGNITE_RULES = ImmutableList.of(
-        IgniteFilterRule.INSTANCE,
-        IgniteProjectRule.INSTANCE,
-        IgniteJoinRule.INSTANCE);
-
-    public static List<RelOptRule> logicalRules(PlannerContext ctx) {
-        return ImmutableList.<RelOptRule>builder()
-            .addAll(BASE_RULES)
-            .addAll(ABSTRACT_RULES)
-            .addAll(ABSTRACT_RELATIONAL_RULES)
-            .addAll(IGNITE_RULES)
-            .build();
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
new file mode 100644
index 0000000..00a2bd9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/JoinConverter.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class JoinConverter extends AbstractVariableConverter {
+    public JoinConverter() {
+        super(LogicalJoin.class, Convention.NONE, IgniteConvention.INSTANCE, "JoinConverter");
+    }
+
+    @Override public List<RelNode> convert(RelNode rel, boolean firstOnly) {
+        LogicalJoin join = (LogicalJoin) rel;
+
+        RelNode left = convert(join.getLeft(), IgniteConvention.INSTANCE);
+        RelNode right = convert(join.getRight(), IgniteConvention.INSTANCE);
+
+        RelOptCluster cluster = join.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        List<IgniteDistribution> leftTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
+        List<IgniteDistribution> rightTraits = IgniteMdDerivedDistribution.deriveDistributions(left, IgniteConvention.INSTANCE, mq);
+
+        List<IgniteDistributions.BiSuggestion> suggestions = IgniteDistributions.suggestJoin(leftTraits, rightTraits, join.analyzeCondition(), join.getJoinType());
+
+        return firstOnly ? F.asList(create(join, left, right, F.first(suggestions))) :
+            Commons.transform(suggestions, s -> create(join, left, right, s));
+    }
+
+    private static RelNode create(LogicalJoin join, RelNode left, RelNode right, IgniteDistributions.BiSuggestion suggest) {
+        left = convert(left, suggest.left());
+        right = convert(right, suggest.right());
+
+        RelTraitSet traitSet = join.getTraitSet().replace(IgniteConvention.INSTANCE).replace(suggest.out());
+
+        return new IgniteJoin(join.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType());
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverter.java
new file mode 100644
index 0000000..da281a0
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverter.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDerivedDistribution;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class ProjectConverter extends AbstractVariableConverter {
+    public ProjectConverter() {
+        super(LogicalProject.class, Convention.NONE, IgniteConvention.INSTANCE, "ProjectConverter");
+    }
+
+    @Override public List<RelNode> convert(RelNode rel, boolean firstOnly) {
+        LogicalProject project = (LogicalProject) rel;
+
+        RelNode input = convert(project.getInput(), IgniteConvention.INSTANCE);
+
+        RelOptCluster cluster = rel.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        List<IgniteDistribution> distrs = IgniteMdDerivedDistribution.deriveDistributions(input, IgniteConvention.INSTANCE, mq);
+
+        return firstOnly ? F.asList(create(project, input, F.first(distrs))) :
+            Commons.transform(distrs, d -> create(project, input, d));
+    }
+
+    private static IgniteProject create(LogicalProject project, RelNode input, IgniteDistribution distr) {
+        RelTraitSet traits = project.getTraitSet()
+            .replace(IgniteMdDistribution.project(input.getRowType(), distr, project.getProjects()))
+            .replace(IgniteConvention.INSTANCE);
+
+        return new IgniteProject(project.getCluster(), traits, convert(input, distr), project.getProjects(), project.getRowType());
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableScanConverter.java
similarity index 50%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableScanConverter.java
index f9e4a8d..7665a78 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableScanConverter.java
@@ -14,27 +14,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
+package org.apache.ignite.internal.processors.query.calcite.rule;
 
-import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 
 /**
  *
  */
-public class TableScanNode extends RelGraphNode {
-    private final List<String> tableName;
-
-    private TableScanNode(List<String> tableName) {
-        this.tableName = tableName;
+public class TableScanConverter extends ConverterRule {
+    public TableScanConverter() {
+        super(LogicalTableScan.class, Convention.NONE, IgniteConvention.INSTANCE, "TableScanConverter");
     }
 
-    public static TableScanNode create(IgniteTableScan rel) {
-        return new TableScanNode(rel.getTable().getQualifiedName());
-    }
+    @Override public RelNode convert(RelNode rel) {
+        LogicalTableScan scan = (LogicalTableScan) rel;
 
-    @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return ctx.getSchema().getTableForMember(tableName).toRel(ctx);
+        RelTraitSet traitSet = scan.getTraitSet().replace(IgniteConvention.INSTANCE);
+        return new IgniteTableScan(rel.getCluster(), traitSet, scan.getTable());
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
index 18c8e5e..271c5f6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -55,7 +56,10 @@ public class IgniteSchema extends AbstractSchema {
      * @param cacheInfo Cache info.
      */
     public void onSqlTypeCreate(GridQueryTypeDescriptor typeDesc, GridCacheContextInfo cacheInfo) {
-        addTable(new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowType(typeDesc)));
+        Object identityKey = cacheInfo.config().getCacheMode() == CacheMode.PARTITIONED ?
+            cacheInfo.cacheContext().group().affinity().similarAffinityKey() : null;
+
+        addTable(new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowType(typeDesc), identityKey));
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 14b8191..56e9302 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -17,25 +17,33 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
+import com.google.common.collect.ImmutableList;
+import java.util.List;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Statistic;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.AffinityFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /** */
@@ -43,11 +51,13 @@ public class IgniteTable extends AbstractTable implements TranslatableTable, Sca
     private final String tableName;
     private final String cacheName;
     private final RowType rowType;
+    private final Object identityKey;
 
-    public IgniteTable(String tableName, String cacheName, RowType rowType) {
+    public IgniteTable(String tableName, String cacheName, RowType rowType, Object identityKey) {
         this.tableName = tableName;
         this.cacheName = cacheName;
         this.rowType = rowType;
+        this.identityKey = identityKey;
     }
 
     /**
@@ -69,17 +79,30 @@ public class IgniteTable extends AbstractTable implements TranslatableTable, Sca
         return rowType.asRelDataType(typeFactory);
     }
 
+    @Override public Statistic getStatistic() {
+        return new TableStatistics();
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
-        PlannerContext ctx = Commons.plannerContext(cluster.getPlanner().getContext());
-        RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION)
-                .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(ctx));
-        return new IgniteTableScan(cluster, traitSet, relOptTable);
+        RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
+                .replaceIf(DistributionTraitDef.INSTANCE, this::getDistribution);
+
+        return new LogicalTableScan(cluster, traitSet, relOptTable);
+    }
+
+    public IgniteDistribution getDistribution() {
+        Object key = identityKey();
+
+        if (key == null)
+            return IgniteDistributions.broadcast();
+
+        return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(CU.cacheId(cacheName), key));
     }
 
-    public DistributionTrait distributionTrait(PlannerContext context) {
-        return context.distributionTrait(CU.cacheId(cacheName), rowType);
+    protected Object identityKey() {
+        return identityKey;
     }
 
     public FragmentInfo fragmentInfo(PlannerContext ctx) {
@@ -89,4 +112,26 @@ public class IgniteTable extends AbstractTable implements TranslatableTable, Sca
     @Override public Enumerable<Object[]> scan(DataContext root) {
         throw new AssertionError(); // TODO
     }
+
+    private class TableStatistics implements Statistic {
+        @Override public Double getRowCount() {
+            return null;
+        }
+
+        @Override public boolean isKey(ImmutableBitSet columns) {
+            return false;
+        }
+
+        @Override public List<RelReferentialConstraint> getReferentialConstraints() {
+            return ImmutableList.of();
+        }
+
+        @Override public List<RelCollation> getCollations() {
+            return ImmutableList.of();
+        }
+
+        @Override public RelDistribution getDistribution() {
+            return IgniteTable.this.getDistribution();
+        }
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java
index 19ccfc2..0d8399e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ConversionContext.java
@@ -16,10 +16,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.ExpToRexTranslator;
 
 /**
@@ -30,7 +30,7 @@ public interface ConversionContext extends RelOptTable.ToRelContext {
 
     RelOptSchema getSchema();
 
-    Context getContext();
+    PlannerContext getContext();
 
     ExpToRexTranslator getExpressionTranslator();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java
index 82041d6..1fd3da2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/FilterNode.java
@@ -16,37 +16,43 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
 public class FilterNode extends RelGraphNode {
-    private final int[] variables;
     private final Expression condition;
 
-    private FilterNode(Expression condition, int[] variables) {
-        this.variables = variables;
+    private FilterNode(Expression condition) {
         this.condition = condition;
     }
 
     public static FilterNode create(IgniteFilter rel, RexToExpTranslator expTranslator) {
-        return new FilterNode(expTranslator.translate(rel.getCondition()),
-            rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray());
+        return new FilterNode(expTranslator.translate(rel.getCondition()));
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return IgniteFilter.create(
-            F.first(children),
-            condition.implement(ctx.getExpressionTranslator()),
-            Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()));
+        RelNode input = F.first(children);
+        RexNode condition = this.condition.implement(ctx.getExpressionTranslator());
+        RelOptCluster cluster = input.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
+            .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.filter(mq, input, condition));
+
+        return new IgniteFilter(cluster, traits, input, condition);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java
index 5de192a..f7574bf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/GraphToRelConverter.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
@@ -29,7 +28,9 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.ExpToRexTranslator;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -58,8 +59,8 @@ public class GraphToRelConverter implements ConversionContext {
         return relBuilder.getRelOptSchema();
     }
 
-    @Override public Context getContext() {
-        return getCluster().getPlanner().getContext();
+    @Override public PlannerContext getContext() {
+        return Commons.plannerContext(getCluster().getPlanner().getContext());
     }
 
     @Override public ExpToRexTranslator getExpressionTranslator() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java
index 587fb35..c26d2c7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/JoinNode.java
@@ -34,22 +34,19 @@ public class JoinNode extends RelGraphNode {
     private final Expression condition;
     private final int[] variables;
     private final JoinRelType joinType;
-    private final boolean semiDone;
 
-    private JoinNode(RelTraitSet traits, Expression condition, int[] variables, JoinRelType joinType, boolean semiDone) {
+    private JoinNode(RelTraitSet traits, Expression condition, int[] variables, JoinRelType joinType) {
         super(traits);
         this.condition = condition;
         this.variables = variables;
         this.joinType = joinType;
-        this.semiDone = semiDone;
     }
 
     public static JoinNode create(IgniteJoin rel, RexToExpTranslator expTranslator) {
         return new JoinNode(rel.getTraitSet(),
             expTranslator.translate(rel.getCondition()),
             rel.getVariablesSet().stream().mapToInt(CorrelationId::getId).toArray(),
-            rel.getJoinType(),
-            rel.isSemiJoinDone());
+            rel.getJoinType());
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
@@ -64,7 +61,6 @@ public class JoinNode extends RelGraphNode {
             right,
             ctx.getExpressionTranslator().translate(condition),
             Arrays.stream(variables).mapToObj(CorrelationId::new).collect(Collectors.toSet()),
-            joinType,
-            semiDone);
+            joinType);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java
index 7a157df..56c60df 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ProjectNode.java
@@ -17,11 +17,18 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
 import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -42,8 +49,14 @@ public class ProjectNode extends RelGraphNode {
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return IgniteProject.create(F.first(children),
-            ctx.getExpressionTranslator().translate(projects),
-            dataType.toRelDataType(ctx.getTypeFactory()));
+        RelNode input = F.first(children);
+        List<RexNode> projects = ctx.getExpressionTranslator().translate(this.projects);
+        RelOptCluster cluster = input.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
+            .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.project(mq, input, projects));
+
+        return new IgniteProject(cluster, traits, input, projects, dataType.toRelDataType(ctx.getTypeFactory()));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index 24534ec..96bc0dd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -21,8 +21,8 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
-import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelSourceImpl;
 
 
 /**
@@ -30,16 +30,16 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
  */
 public class ReceiverNode extends RelGraphNode {
     private final DataType dataType;
-    private final Source source;
+    private final RelSource source;
 
-    private ReceiverNode(RelTraitSet traits, DataType dataType, Source source) {
+    private ReceiverNode(RelTraitSet traits, DataType dataType, RelSource source) {
         super(traits);
         this.dataType = dataType;
         this.source = source;
     }
 
     public static ReceiverNode create(IgniteReceiver rel) {
-        Source source = new SourceImpl(rel.source().exchangeId(), rel.source().mapping());
+        RelSource source = new RelSourceImpl(rel.source().exchangeId(), rel.source().mapping());
 
         return new ReceiverNode(rel.getTraitSet(), DataType.fromType(rel.getRowType()), source);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
index 6e534d2..d935255 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
@@ -28,15 +28,15 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-public class RelToGraphConverter {
+public class RelToGraphConverter implements RelOp<IgniteRel, RelGraph> {
     private final RexToExpTranslator rexTranslator = new RexToExpTranslator();
 
     private RelGraph graph;
@@ -52,7 +52,7 @@ public class RelToGraphConverter {
         }
     }
 
-    private final class Implementor implements RelImplementor<Item> {
+    private final class Implementor implements org.apache.ignite.internal.processors.query.calcite.rel.Implementor<Item> {
         @Override public Item implement(IgniteFilter rel) {
             return new Item(graph.addNode(curParent, FilterNode.create(rel, rexTranslator)), Commons.cast(rel.getInputs()));
         }
@@ -86,7 +86,7 @@ public class RelToGraphConverter {
         }
     }
 
-    public RelGraph convert(IgniteRel root) {
+    @Override public RelGraph go(IgniteRel root) {
         graph = new RelGraph();
 
         Implementor implementor = new Implementor();
@@ -99,7 +99,7 @@ public class RelToGraphConverter {
             curParent = item.parentId;
 
             for (IgniteRel child : item.children) {
-                stack.push(child.implement(implementor));
+                stack.push(implementor.go(child));
             }
         }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index 5164eed..2940015 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -17,18 +17,24 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
+import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
 public class SenderNode extends RelGraphNode {
-    private final Target target;
+    private final RelTarget target;
 
-    private SenderNode(Target target) {
+    private SenderNode(RelTarget target) {
         this.target = target;
     }
 
@@ -37,6 +43,14 @@ public class SenderNode extends RelGraphNode {
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return IgniteSender.create(F.first(children), target);
+        RelNode input = F.first(children);
+        RelOptCluster cluster = input.getCluster();
+        RelMetadataQuery mq = cluster.getMetadataQuery();
+
+        RelTraitSet traits = cluster.traitSet()
+            .replace(IgniteConvention.INSTANCE)
+            .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution._distribution(input, mq));
+
+        return new IgniteSender(cluster, traits, input, target);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java
index b771bfe..94efe86 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SerializedTraits.java
@@ -17,49 +17,39 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
  */
 public class SerializedTraits implements Serializable {
-    private static final Byte IGNITE_CONVENTION = 0;
+    private static final Byte CONVENTION = 0;
 
     private final List<Serializable> traits;
 
     public SerializedTraits(RelTraitSet traits) {
-        this.traits = translate(traits);
+        this.traits = Commons.transform(traits, this::toSerializable);
     }
 
     public RelTraitSet toTraitSet(RelOptCluster cluster) {
         RelTraitSet traits = cluster.traitSet();
 
-        for (Serializable trait : this.traits) {
+        for (Serializable trait : this.traits)
             traits.replace(fromSerializable(trait));
-        }
 
         return traits.simplify();
     }
 
-    private List<Serializable> translate(List<RelTrait> traits) {
-        ArrayList<Serializable> res = new ArrayList<>(traits.size());
-        for (RelTrait trait : traits) {
-            res.add(toSerializable(trait));
-        }
-
-        return res;
-    }
-
     private Serializable toSerializable(RelTrait trait) {
         if (trait instanceof Serializable)
             return (Serializable) trait;
-        if (trait == IgniteRel.IGNITE_CONVENTION)
-            return IGNITE_CONVENTION;
+        if (trait == IgniteConvention.INSTANCE)
+            return CONVENTION;
 
         throw new AssertionError();
     }
@@ -67,8 +57,8 @@ public class SerializedTraits implements Serializable {
     private RelTrait fromSerializable(Serializable trait) {
         if (trait instanceof RelTrait)
             return (RelTrait) trait;
-        if (IGNITE_CONVENTION.equals(trait))
-            return IgniteRel.IGNITE_CONVENTION;
+        if (CONVENTION.equals(trait))
+            return IgniteConvention.INSTANCE;
 
         throw new AssertionError();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
index f9e4a8d..097a8fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/TableScanNode.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.util.List;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 
@@ -26,15 +27,18 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 public class TableScanNode extends RelGraphNode {
     private final List<String> tableName;
 
-    private TableScanNode(List<String> tableName) {
+    private TableScanNode(RelTraitSet traits, List<String> tableName) {
+        super(traits);
         this.tableName = tableName;
     }
 
     public static TableScanNode create(IgniteTableScan rel) {
-        return new TableScanNode(rel.getTable().getQualifiedName());
+        return new TableScanNode(rel.getTraitSet(), rel.getTable().getQualifiedName());
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return ctx.getSchema().getTableForMember(tableName).toRel(ctx);
+        return new IgniteTableScan(ctx.getCluster(),
+            traitSet.toTraitSet(ctx.getCluster()),
+            ctx.getSchema().getTableForMember(tableName));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
similarity index 94%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
index 58f7146..a065dbe 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Edge.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import org.apache.calcite.rel.RelNode;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index e523f50..c812f66 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -27,13 +27,13 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
  */
-public class Fragment implements Source {
+public class Fragment implements RelSource {
     private static final AtomicLong ID_GEN = new AtomicLong();
 
     private final long exchangeId = ID_GEN.getAndIncrement();
@@ -47,7 +47,6 @@ public class Fragment implements Source {
     }
 
     public void init(PlannerContext ctx, RelMetadataQuery mq) {
-
         FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
 
         if (info.mapping() == null)
@@ -55,12 +54,12 @@ public class Fragment implements Source {
         else
             mapping = info.mapping().deduplicate();
 
-        ImmutableList<Pair<IgniteReceiver, Source>> sources = info.sources();
+        ImmutableList<Pair<IgniteReceiver, RelSource>> sources = info.sources();
 
         if (!F.isEmpty(sources)) {
-            for (Pair<IgniteReceiver, Source> input : sources) {
+            for (Pair<IgniteReceiver, RelSource> input : sources) {
                 IgniteReceiver receiver = input.left;
-                Source source = input.right;
+                RelSource source = input.right;
 
                 source.init(mapping, receiver.distribution(), ctx, mq);
             }
@@ -71,10 +70,10 @@ public class Fragment implements Source {
         return exchangeId;
     }
 
-    @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
+    @Override public void init(NodesMapping mapping, IgniteDistribution distribution, PlannerContext ctx, RelMetadataQuery mq) {
         assert remote();
 
-        ((IgniteSender) root).init(new TargetImpl(exchangeId, mapping, distribution));
+        ((IgniteSender) root).target(new RelTargetImpl(exchangeId, mapping, distribution));
 
         init(ctx, mq);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index fd0c75f..4a8ac5d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -26,7 +26,6 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQ
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -62,11 +61,11 @@ public class QueryPlan {
                 RelOptCluster cluster = child.getCluster();
                 RelTraitSet traitSet = child.getTraitSet();
 
-                IgniteSender sender = new IgniteSender(cluster, traitSet, child);
-                Fragment fragment = new Fragment(sender);
+                Fragment fragment = new Fragment(new IgniteSender(cluster, traitSet, child));
+
                 fragments.add(fragment);
 
-                parent.replaceInput(edge.childIdx(), new IgniteReceiver(cluster, traitSet, sender.getRowType(), fragment));
+                parent.replaceInput(edge.childIdx(), new IgniteReceiver(cluster, traitSet, child.getRowType(), fragment));
             }
         }
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
similarity index 88%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
index e0108b7..b4eb49e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
@@ -19,12 +19,12 @@ package org.apache.ignite.internal.processors.query.calcite.splitter;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  *
  */
-public interface Source {
+public interface RelSource {
     /**
      * @return Exchange id, has to be unique in scope of query.
      */
@@ -41,7 +41,7 @@ public interface Source {
      * @param ctx Context.
      * @param mq Metadata query instance.
      */
-    default void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
-        // No-op.
+    default void init(NodesMapping mapping, IgniteDistribution distribution, PlannerContext ctx, RelMetadataQuery mq) {
+        // No-op
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
similarity index 90%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
index 44b271d..b0ca757 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
@@ -22,11 +22,11 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 /**
  *
  */
-public class SourceImpl implements Source, Serializable {
+public class RelSourceImpl implements RelSource, Serializable {
     private final long exchangeId;
     private final NodesMapping mapping;
 
-    public SourceImpl(long exchangeId, NodesMapping mapping) {
+    public RelSourceImpl(long exchangeId, NodesMapping mapping) {
         this.exchangeId = exchangeId;
         this.mapping = mapping;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
similarity index 91%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
index fecea20..b031b8e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Target.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
@@ -17,13 +17,13 @@
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  *
  */
-public interface Target {
+public interface RelTarget {
     long exchangeId();
     NodesMapping mapping();
-    DistributionTrait distribution();
+    IgniteDistribution distribution();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
similarity index 81%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
index a69a8f1..a3c9b29 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TargetImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
@@ -18,17 +18,17 @@ package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import java.io.Serializable;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
 /**
  *
  */
-public class TargetImpl implements Target, Serializable {
+public class RelTargetImpl implements RelTarget, Serializable {
     private final long exchangeId;
     private final NodesMapping mapping;
-    private final DistributionTrait distribution;
+    private final IgniteDistribution distribution;
 
-    public TargetImpl(long exchangeId, NodesMapping mapping, DistributionTrait distribution) {
+    public RelTargetImpl(long exchangeId, NodesMapping mapping, IgniteDistribution distribution) {
         this.exchangeId = exchangeId;
         this.mapping = mapping;
         this.distribution = distribution;
@@ -42,7 +42,7 @@ public class TargetImpl implements Target, Serializable {
         return mapping;
     }
 
-    @Override public DistributionTrait distribution() {
+    @Override public IgniteDistribution distribution() {
         return distribution;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 53ae9df..f2ea67a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelShuttle;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
 
 /**
  *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AffinityFactory.java
similarity index 61%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AffinityFactory.java
index 863f93c..f1dbbac 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AffinityFactory.java
@@ -16,7 +16,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
-import java.io.ObjectStreamException;
 import java.util.List;
 import java.util.UUID;
 import java.util.function.ToIntFunction;
@@ -29,29 +28,19 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  *
  */
-public final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
-    public static final DestinationFunctionFactory INSTANCE = new HashFunctionFactory();
+public final class AffinityFactory extends AbstractDestinationFunctionFactory {
+    private final int cacheId;
+    private final Object key;
 
-    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
-        assert m != null && !F.isEmpty(m.assignments());
-
-        int[] fields = k.toIntArray();
-
-        ToIntFunction<Object> hashFun = r -> {
-            Object[] row = (Object[]) r;
-
-            if (row == null)
-                return 0;
-
-            int hash = 1;
-
-            for (int i : fields)
-                hash = 31 * hash + (row[i] == null ? 0 : row[i].hashCode());
+    public AffinityFactory(int cacheId, Object key) {
+        this.cacheId = cacheId;
+        this.key = key;
+    }
 
-            return hash;
-        };
+    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) {
+        assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
 
-        List<List<UUID>> assignments = m.assignments();
+        List<List<UUID>> assignments = mapping.assignments();
 
         if (U.assertionsEnabled()) {
             for (List<UUID> assignment : assignments) {
@@ -59,14 +48,13 @@ public final class HashFunctionFactory extends AbstractDestinationFunctionFactor
             }
         }
 
-        return r -> assignments.get(hashFun.applyAsInt(r) % assignments.size());
-    }
+        ToIntFunction<Object> rowToPart = ctx.kernalContext()
+            .cache().context().cacheContext(cacheId).affinity()::partition;
 
-    @Override public Object key() {
-        return "HashFunctionFactory";
+        return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
     }
 
-    private Object readResolve() throws ObjectStreamException {
-        return INSTANCE;
+    @Override public Object key() {
+        return key;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 0fe775a..5972a11 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,43 +16,59 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import com.google.common.collect.Ordering;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.ObjectStreamException;
 import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
+import org.apache.calcite.plan.RelMultipleTrait;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mappings;
+
+import static org.apache.calcite.rel.RelDistribution.Type.ANY;
+import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.RANDOM_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.RANGE_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED;
 
 /**
  *
  */
-public final class DistributionTrait implements RelTrait, Serializable {
-    private DistributionType type;
+public final class DistributionTrait implements IgniteDistribution, Serializable {
+    private static final Ordering<Iterable<Integer>> ORDERING =
+        Ordering.<Integer>natural().lexicographical();
+
+    private RelDistribution.Type type;
     private ImmutableIntList keys;
     private DestinationFunctionFactory functionFactory;
 
     public DistributionTrait() {
     }
 
-    public DistributionTrait(DistributionType type, ImmutableIntList keys, DestinationFunctionFactory functionFactory) {
+    public DistributionTrait(RelDistribution.Type type, ImmutableIntList keys, DestinationFunctionFactory functionFactory) {
+        if (type == RANGE_DISTRIBUTED || type == ROUND_ROBIN_DISTRIBUTED)
+            throw new IllegalArgumentException("Distribution type " + type + " is unsupported.");
+
         this.type = type;
         this.keys = keys;
         this.functionFactory = functionFactory;
     }
 
-    public DistributionType type() {
+    @Override public RelDistribution.Type getType() {
         return type;
     }
 
-    public DestinationFunctionFactory destinationFunctionFactory() {
+    @Override public DestinationFunctionFactory destinationFunctionFactory() {
         return functionFactory;
     }
 
-    public ImmutableIntList keys() {
+    @Override public ImmutableIntList getKeys() {
         return keys;
     }
 
@@ -65,7 +81,9 @@ public final class DistributionTrait implements RelTrait, Serializable {
         if (o instanceof DistributionTrait) {
             DistributionTrait that = (DistributionTrait) o;
 
-            return type == that.type() && Objects.equals(keys, that.keys);
+            return type == that.type
+                && Objects.equals(keys, that.keys)
+                && Objects.equals(functionFactory.key(), that.functionFactory.key());
         }
 
         return false;
@@ -76,10 +94,10 @@ public final class DistributionTrait implements RelTrait, Serializable {
     }
 
     @Override public String toString() {
-        return type + (type == DistributionType.HASH ? String.valueOf(keys) : "");
+        return type + (type == Type.HASH_DISTRIBUTED ? "[" + functionFactory.key() + "]" + keys : "");
     }
 
-    @Override public RelTraitDef getTraitDef() {
+    @Override public DistributionTraitDef getTraitDef() {
         return DistributionTraitDef.INSTANCE;
     }
 
@@ -92,15 +110,15 @@ public final class DistributionTrait implements RelTrait, Serializable {
 
         DistributionTrait other = (DistributionTrait) trait;
 
-        if (other.type() == DistributionType.ANY)
+        if (other.getType() == ANY)
             return true;
 
-        if (type() == other.type())
-            return type() != DistributionType.HASH
+        if (getType() == other.getType())
+            return getType() != HASH_DISTRIBUTED
                 || (Objects.equals(keys, other.keys)
                     && Objects.equals(functionFactory, other.functionFactory));
 
-        return other.type() == DistributionType.RANDOM && type() == DistributionType.HASH;
+        return other.getType() == RANDOM_DISTRIBUTED && getType() == HASH_DISTRIBUTED;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -110,7 +128,7 @@ public final class DistributionTrait implements RelTrait, Serializable {
     }
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        type = (DistributionType) in.readObject();
+        type = (Type) in.readObject();
         keys = ImmutableIntList.of((int[])in.readObject());
         functionFactory = (DestinationFunctionFactory) in.readObject();
     }
@@ -118,4 +136,39 @@ public final class DistributionTrait implements RelTrait, Serializable {
     private Object readResolve() throws ObjectStreamException {
         return DistributionTraitDef.INSTANCE.canonize(this);
     }
+
+    @Override public IgniteDistribution apply(Mappings.TargetMapping mapping) {
+        if (keys.isEmpty())
+            return this;
+
+        assert type == HASH_DISTRIBUTED;
+
+        List<Integer> newKeys = IgniteDistributions.projectDistributionKeys(mapping, keys);
+
+        return newKeys.size() == keys.size() ? IgniteDistributions.hash(newKeys, functionFactory) :
+            IgniteDistributions.random();
+    }
+
+    @Override public boolean isTop() {
+        return type == Type.ANY;
+    }
+
+    @Override public int compareTo(RelMultipleTrait o) {
+        // TODO is this method really needed??
+
+        final IgniteDistribution distribution = (IgniteDistribution) o;
+
+        if (type == distribution.getType()
+            && (type == Type.HASH_DISTRIBUTED
+            || type == Type.RANGE_DISTRIBUTED)) {
+            int cmp = ORDERING.compare(getKeys(), distribution.getKeys());
+
+            if (cmp == 0)
+                cmp = Integer.compare(functionFactory.key().hashCode(), distribution.destinationFunctionFactory().key().hashCode());
+
+            return cmp;
+        }
+
+        return type.compareTo(distribution.getType());
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index 22e8815..306b7aa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -16,42 +16,51 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 
 /**
  *
  */
-public class DistributionTraitDef extends RelTraitDef<DistributionTrait> {
+public class DistributionTraitDef extends RelTraitDef<IgniteDistribution> {
     /** */
     public static final DistributionTraitDef INSTANCE = new DistributionTraitDef();
 
-    @Override public Class<DistributionTrait> getTraitClass() {
-        return DistributionTrait.class;
+    @Override public Class<IgniteDistribution> getTraitClass() {
+        return IgniteDistribution.class;
     }
 
     @Override public String getSimpleName() {
         return "distr";
     }
 
-    @Override public RelNode convert(RelOptPlanner planner, RelNode rel, DistributionTrait targetDist, boolean allowInfiniteCostConverters) {
-        DistributionTrait srcDist = rel.getTraitSet().getTrait(INSTANCE);
+    @Override public RelNode convert(RelOptPlanner planner, RelNode rel, IgniteDistribution targetDist, boolean allowInfiniteCostConverters) {
+        if (rel.getConvention() == Convention.NONE)
+            return null;
+
+        RelDistribution srcDist = rel.getTraitSet().getTrait(INSTANCE);
 
-        // Source and Target have the same trait.
-        if (srcDist.equals(targetDist))
+        if (srcDist == targetDist) // has to be interned
             return rel;
 
-        if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
-            return null;
+        switch(targetDist.getType()){
+            case HASH_DISTRIBUTED:
+            case BROADCAST_DISTRIBUTED:
+            case SINGLETON:
+                Exchange exchange = new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel, targetDist);
+                RelNode newRel = planner.register(exchange, rel);
+                RelTraitSet newTraits = rel.getTraitSet().replace(targetDist);
+
+                if (!newRel.getTraitSet().equals(newTraits))
+                    newRel = planner.changeTraits(newRel, newTraits);
 
-        switch(targetDist.type()){
-            case HASH:
-            case BROADCAST:
-            case SINGLE:
-                return new IgniteExchange(rel.getCluster(), rel.getTraitSet().replace(targetDist), rel);
+                return newRel;
             case ANY:
                 return rel;
             default:
@@ -59,11 +68,11 @@ public class DistributionTraitDef extends RelTraitDef<DistributionTrait> {
         }
     }
 
-    @Override public boolean canConvert(RelOptPlanner planner, DistributionTrait fromTrait, DistributionTrait toTrait) {
+    @Override public boolean canConvert(RelOptPlanner planner, IgniteDistribution fromTrait, IgniteDistribution toTrait) {
         return true;
     }
 
-    @Override public DistributionTrait getDefault() {
+    @Override public IgniteDistribution getDefault() {
         return IgniteDistributions.any();
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index 863f93c..82c7325 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -63,7 +63,7 @@ public final class HashFunctionFactory extends AbstractDestinationFunctionFactor
     }
 
     @Override public Object key() {
-        return "HashFunctionFactory";
+        return "DefaultHashFunction";
     }
 
     private Object readResolve() throws ObjectStreamException {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
similarity index 66%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
index ad710aa..5f3b175 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
@@ -14,14 +14,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.metadata;
+package org.apache.ignite.internal.processors.query.calcite.trait;
 
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.util.mapping.Mappings;
 
 /**
  *
  */
-public interface TableDistributionService {
-    DistributionTrait distribution(int cacheId, RowType rowType);
+public interface IgniteDistribution extends RelDistribution {
+    DestinationFunctionFactory destinationFunctionFactory();
+    @Override IgniteDistribution apply(Mappings.TargetMapping mapping);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 6a9e76d..3e3aa55 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -20,59 +20,253 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 
-import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
+import static org.apache.calcite.rel.RelDistribution.Type.ANY;
+import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.RANDOM_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 
 /**
  *
  */
 public class IgniteDistributions {
-    private static final DistributionTrait BROADCAST = new DistributionTrait(DistributionType.BROADCAST, ImmutableIntList.of(), AllTargetsFactory.INSTANCE);
-    private static final DistributionTrait SINGLE = new DistributionTrait(DistributionType.SINGLE, ImmutableIntList.of(), SingleTargetFactory.INSTANCE);
-    private static final DistributionTrait RANDOM = new DistributionTrait(DistributionType.RANDOM, ImmutableIntList.of(), RandomTargetFactory.INSTANCE);
-    private static final DistributionTrait ANY    = new DistributionTrait(DistributionType.ANY, ImmutableIntList.of(), NoOpFactory.INSTANCE);
+    private static final int BEST_CNT = 3;
 
-    public static DistributionTrait any() {
-        return ANY;
+    private static final IgniteDistribution BROADCAST_DISTR = new DistributionTrait(BROADCAST_DISTRIBUTED, ImmutableIntList.of(), AllTargetsFactory.INSTANCE);
+    private static final IgniteDistribution SINGLETON_DISTR = new DistributionTrait(SINGLETON, ImmutableIntList.of(), SingleTargetFactory.INSTANCE);
+    private static final IgniteDistribution RANDOM_DISTR = new DistributionTrait(RANDOM_DISTRIBUTED, ImmutableIntList.of(), RandomTargetFactory.INSTANCE);
+    private static final IgniteDistribution ANY_DISTR = new DistributionTrait(ANY, ImmutableIntList.of(), NoOpFactory.INSTANCE);
+
+    public static IgniteDistribution any() {
+        return ANY_DISTR;
+    }
+
+    public static IgniteDistribution random() {
+        return RANDOM_DISTR;
+    }
+
+    public static IgniteDistribution single() {
+        return SINGLETON_DISTR;
+    }
+
+    public static IgniteDistribution broadcast() {
+        return BROADCAST_DISTR;
     }
 
-    public static DistributionTrait random() {
-        return RANDOM;
+    public static IgniteDistribution hash(List<Integer> keys) {
+        return DistributionTraitDef.INSTANCE.canonize(
+            new DistributionTrait(HASH_DISTRIBUTED, ImmutableIntList.copyOf(keys), HashFunctionFactory.INSTANCE));
+    }
+
+    public static IgniteDistribution hash(List<Integer> keys, DestinationFunctionFactory factory) {
+        return DistributionTraitDef.INSTANCE.canonize(
+            new DistributionTrait(HASH_DISTRIBUTED, ImmutableIntList.copyOf(keys), factory));
+    }
+
+    public static List<BiSuggestion> suggestJoin(IgniteDistribution leftIn, IgniteDistribution rightIn,
+        JoinInfo joinInfo, JoinRelType joinType) {
+        return topN(suggestJoin0(leftIn, rightIn, joinInfo, joinType), BEST_CNT);
+    }
+
+    public static List<BiSuggestion> suggestJoin(List<IgniteDistribution> leftIn, List<IgniteDistribution> rightIn,
+        JoinInfo joinInfo, JoinRelType joinType) {
+        HashSet<BiSuggestion> suggestions = new HashSet<>();
+
+        int bestCnt = 0;
+
+        for (IgniteDistribution leftIn0 : leftIn) {
+            for (IgniteDistribution rightIn0 : rightIn) {
+                for (BiSuggestion suggest : suggestJoin0(leftIn0, rightIn0, joinInfo, joinType)) {
+                    if (suggestions.add(suggest) && suggest.needExchange == 0 && (++bestCnt) == BEST_CNT)
+                        topN(new ArrayList<>(suggestions), BEST_CNT);
+                }
+            }
+        }
+
+        return topN(new ArrayList<>(suggestions), BEST_CNT);
     }
 
-    public static DistributionTrait single() {
-        return SINGLE;
+    private static ArrayList<BiSuggestion> suggestJoin0(IgniteDistribution leftIn, IgniteDistribution rightIn,
+        JoinInfo joinInfo, JoinRelType joinType) {
+        /*
+         * Distributions table:
+         *
+         * ===============INNER JOIN==============
+         * hash + hash = hash
+         * broadcast + hash = hash
+         * hash + broadcast = hash
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         * ===============LEFT JOIN===============
+         * hash + hash = hash
+         * hash + broadcast = hash
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         * ===============RIGHT JOIN==============
+         * hash + hash = hash
+         * broadcast + hash = hash
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         * ===========FULL JOIN/CROSS JOIN========
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         *
+         * others require redistribution
+         */
+
+        ArrayList<BiSuggestion> res = new ArrayList<>();
+
+        IgniteDistribution out, left, right;
+
+        if (joinType == LEFT || joinType == RIGHT || (joinType == INNER && !F.isEmpty(joinInfo.keys()))) {
+            HashSet<DestinationFunctionFactory> factories = U.newHashSet(3);
+
+            if (leftIn.getKeys().equals(joinInfo.leftKeys))
+                factories.add(leftIn.destinationFunctionFactory());
+
+            if (rightIn.getKeys().equals(joinInfo.rightKeys))
+                factories.add(rightIn.destinationFunctionFactory());
+
+            factories.add(HashFunctionFactory.INSTANCE);
+
+            for (DestinationFunctionFactory factory : factories) {
+                out = hash(joinInfo.leftKeys, factory);
+
+                left = hash(joinInfo.leftKeys, factory); right = hash(joinInfo.rightKeys, factory);
+                add(res, out, leftIn, rightIn, left, right);
+
+                if (joinType == INNER || joinType == LEFT) {
+                    left = hash(joinInfo.leftKeys, factory); right = broadcast();
+                    add(res, out, leftIn, rightIn, left, right);
+                }
+
+                if (joinType == INNER || joinType == RIGHT) {
+                    left = broadcast(); right = hash(joinInfo.rightKeys, factory);
+                    add(res, out, leftIn, rightIn, left, right);
+                }
+            }
+        }
+
+        out = left = right = broadcast();
+        add(res, out, leftIn, rightIn, left, right);
+
+        out = left = right = single();
+        add(res, out, leftIn, rightIn, left, right);
+
+        return res;
     }
 
-    public static DistributionTrait broadcast() {
-        return BROADCAST;
+    private static int add(ArrayList<BiSuggestion> dst, IgniteDistribution out, IgniteDistribution left, IgniteDistribution right,
+        IgniteDistribution newLeft, IgniteDistribution newRight) {
+        int exch = 0;
+
+        if (!left.satisfies(newLeft))
+            exch++;
+
+        if (!right.satisfies(newRight))
+            exch++;
+
+        dst.add(new BiSuggestion(out, newLeft, newRight, exch));
+
+        return exch;
     }
 
-    public static DistributionTrait hash(List<Integer> keys) {
-        return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), HashFunctionFactory.INSTANCE);
+    private static List<BiSuggestion> topN(ArrayList<BiSuggestion> src, int n) {
+        Collections.sort(src);
+
+        return src.size() <= n ? src : src.subList(0, n);
     }
 
-    public static DistributionTrait hash(List<Integer> keys, DestinationFunctionFactory factory) {
-        return new DistributionTrait(HASH, ImmutableIntList.copyOf(keys), factory);
+    public static List<Integer> projectDistributionKeys(Mappings.TargetMapping mapping, ImmutableIntList keys) {
+        if (mapping.getTargetCount() < keys.size())
+            return Collections.emptyList();
+
+        List<Integer> resKeys = new ArrayList<>(mapping.getTargetCount());
+
+        parent:
+        for (int i = 0; i < keys.size(); i++) {
+            int key = keys.getInt(i);
+
+            for (int j = 0; j < mapping.getTargetCount(); j++) {
+                if (mapping.getSourceOpt(j) == key) {
+                    resKeys.add(j);
+
+                    continue parent;
+                }
+            }
+
+            return Collections.emptyList();
+        }
+
+        return resKeys;
     }
 
-    public static List<DistributionTrait> deriveDistributions(RelNode rel, RelMetadataQuery mq) {
-        if (!(rel instanceof RelSubset)) {
-            DistributionTrait dist = IgniteMdDistribution.distribution(rel, mq);
+    public static class BiSuggestion implements Comparable<BiSuggestion> {
+        private final IgniteDistribution out;
+        private final IgniteDistribution left;
+        private final IgniteDistribution right;
+        private final int needExchange;
+
+        public BiSuggestion(IgniteDistribution out, IgniteDistribution left, IgniteDistribution right, int needExchange) {
+            this.out = out;
+            this.left = left;
+            this.right = right;
+            this.needExchange = needExchange;
+        }
+
+        public IgniteDistribution out() {
+            return out;
+        }
+
+        public IgniteDistribution left() {
+            return left;
+        }
+
+        public IgniteDistribution right() {
+            return right;
+        }
+
+        public int needExchange() {
+            return needExchange;
+        }
 
-            return dist.type() == DistributionType.ANY ? Collections.emptyList() : Collections.singletonList(dist);
+        @Override public int compareTo(@NotNull IgniteDistributions.BiSuggestion o) {
+            return Integer.compare(needExchange, o.needExchange);
         }
 
-        HashSet<DistributionTrait> res = new HashSet<>();
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
 
-        for (RelNode relNode : ((RelSubset) rel).getRels())
-            res.addAll(deriveDistributions(relNode, mq));
+            BiSuggestion that = (BiSuggestion) o;
 
-        return res.isEmpty() ? Collections.emptyList() : new ArrayList<>(res);
+            if (needExchange != that.needExchange) return false;
+            if (out != that.out) return false;
+            if (left != that.left) return false;
+            return right == that.right;
+        }
+
+        @Override public int hashCode() {
+            int result = out.hashCode();
+            result = 31 * result + left.hashCode();
+            result = 31 * result + right.hashCode();
+            result = 31 * result + needExchange;
+            return result;
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 3a461cf..49d6091 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -21,28 +21,20 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptNode;
 import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.NotNull;
@@ -81,43 +73,6 @@ public final class Commons {
         return RelOptRule.operand(first, RelOptRule.operand(second, RelOptRule.any()));
     }
 
-    public static <T extends RelNode> RelOp<T, Boolean> transformSubset(RelOptRuleCall call, RelNode input, BiFunction<T, RelNode, RelNode> transformFun) {
-        return rel -> {
-            if (!(input instanceof RelSubset))
-                return Boolean.FALSE;
-
-            RelSubset subset = (RelSubset) input;
-
-            Set<RelTraitSet> traits = subset.getRelList().stream()
-                .filter(r -> r instanceof IgniteRel)
-                .map(RelOptNode::getTraitSet)
-                .collect(Collectors.toSet());
-
-            if (traits.isEmpty())
-                return Boolean.FALSE;
-
-            Set<RelNode> transformed = Collections.newSetFromMap(new IdentityHashMap<>());
-
-            boolean transform = Boolean.FALSE;
-
-            for (RelTraitSet traitSet: traits) {
-                RelNode newRel = RelOptRule.convert(subset, traitSet.simplify());
-
-                if (transformed.add(newRel)) {
-                    RelNode out = transformFun.apply(rel, newRel);
-
-                    if (out != null) {
-                        call.transformTo(out);
-
-                        transform = Boolean.TRUE;
-                    }
-                }
-            }
-
-            return transform;
-        };
-    }
-
     public static <T> List<T> intersect(List<T> left, List<T> right) {
         if (F.isEmpty(left) || F.isEmpty(right))
             return Collections.emptyList();
@@ -171,6 +126,10 @@ public final class Commons {
         return set;
     }
 
+    public static PlannerContext plannerContext(RelNode rel) {
+        return plannerContext(rel.getCluster().getPlanner().getContext());
+    }
+
     public static PlannerContext plannerContext(Context ctx) {
         return Objects.requireNonNull(ctx.unwrap(PlannerContext.class));
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index c7a7081..5104816 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.processors.query.calcite.util;
 
 import java.lang.reflect.Method;
 import org.apache.calcite.linq4j.tree.Types;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DerivedDistribution;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
 
 /**
  *
  */
 public enum IgniteMethod {
-    DISTRIBUTION_TRAIT(DistributionTraitMetadata.class, "getDistributionTrait"),
+    DERIVED_DISTRIBUTIONS(DerivedDistribution.class, "deriveDistributions"),
     FRAGMENT_INFO(FragmentMetadata.class, "getFragmentInfo");
 
     private final Method method;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
deleted file mode 100644
index bcbe604..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.util;
-
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-
-/**
- *
- */
-public interface RelImplementor<T> {
-    T implement(IgniteExchange rel);
-    T implement(IgniteFilter rel);
-    T implement(IgniteJoin rel);
-    T implement(IgniteProject rel);
-    T implement(IgniteTableScan rel);
-    T implement(IgniteReceiver rel);
-    T implement(IgniteSender rel);
-    T implement(IgniteRel other);
-}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index a5159d3..ee4c22e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -40,19 +40,20 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.calcite.exec.ConsumerNode;
-import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable;
+import org.apache.ignite.internal.processors.query.calcite.exec.ImplementorImpl;
 import org.apache.ignite.internal.processors.query.calcite.exec.Node;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue;
 import org.apache.ignite.internal.processors.query.calcite.prepare.DataContextImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
@@ -61,7 +62,6 @@ import org.apache.ignite.internal.processors.query.calcite.serialize.relation.Re
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
 import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
@@ -70,27 +70,29 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.ignite.internal.processors.query.calcite.exec.Interpretable.INTERPRETABLE;
-
 /**
  *
  */
-@WithSystemProperty(key = "calcite.debug", value = "true")
+//@WithSystemProperty(key = "calcite.debug", value = "true")
 public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
-    private static GridTestKernalContext kernalContext;
-    private static CalciteQueryProcessor proc;
-    private static SchemaPlus schema;
-    private static List<UUID> nodes;
+    private GridTestKernalContext kernalContext;
+    private CalciteQueryProcessor proc;
+    private SchemaPlus schema;
+    private List<UUID> nodes;
+
+    private TestIgniteTable city;
+    private TestIgniteTable country;
+    private TestIgniteTable project;
+    private TestIgniteTable developer;
 
-    @BeforeClass
-    public static void setupClass() {
+    @Before
+    public void setup() {
         kernalContext = new GridTestKernalContext(log);
         proc = new CalciteQueryProcessor();
         proc.setLogger(log);
@@ -98,61 +100,50 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
         IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
 
-        publicSchema.addTable(new IgniteTable("Developer", "Developer",
+        developer = new TestIgniteTable("Developer", "Developer",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("projectId", Integer.class)
                 .field("cityId", Integer.class)
-                .build()){
-            @Override public Enumerable<Object[]> scan(DataContext root) {
-                return Linq4j.asEnumerable(Arrays.asList(
-                    new Object[]{0, null, 0, "Igor", 0, 1},
-                    new Object[]{1, null, 1, "Roman", 0, 0}
-                ));
-            }
-        });
+                .build(), Arrays.asList(
+            new Object[]{0, null, 0, "Igor", 0, 1},
+            new Object[]{1, null, 1, "Roman", 0, 0}
+        ));
 
-        publicSchema.addTable(new IgniteTable("Project", "Project",
+        project = new TestIgniteTable("Project", "Project",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("ver", Integer.class)
-                .build()){
-            @Override public Enumerable<Object[]> scan(DataContext root) {
-                return Linq4j.asEnumerable(Arrays.asList(
-                    new Object[]{0, null, 0, "Calcite", 1},
-                    new Object[]{1, null, 1, "Ignite", 1}
-                ));
-            }
-        });
+                .build(), Arrays.asList(
+            new Object[]{0, null, 0, "Calcite", 1},
+            new Object[]{1, null, 1, "Ignite", 1}
+        ));
 
-        publicSchema.addTable(new IgniteTable("Country", "Country",
+        country = new TestIgniteTable("Country", "Country",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("countryCode", Integer.class)
-                .build()){
-            @Override public Enumerable<Object[]> scan(DataContext root) {
-                return Linq4j.asEnumerable(Arrays.<Object[]>asList(
-                    new Object[]{0, null, 0, "Russia", 7}
-                ));
-            }
-        });
+                .build(), Arrays.<Object[]>asList(
+            new Object[]{0, null, 0, "Russia", 7}
+        ));
 
-        publicSchema.addTable(new IgniteTable("City", "City",
+        city = new TestIgniteTable("City", "City",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("countryId", Integer.class)
-                .build()){
-            @Override public Enumerable<Object[]> scan(DataContext root) {
-                return Linq4j.asEnumerable(Arrays.asList(
-                    new Object[]{0, null, 0, "Moscow", 0},
-                    new Object[]{1, null, 1, "Saint Petersburg", 0}
-                ));
-            }
-        });
+                .build(), Arrays.asList(
+            new Object[]{0, null, 0, "Moscow", 0},
+            new Object[]{1, null, 1, "Saint Petersburg", 0}
+        ));
+
+        publicSchema.addTable(developer);
+        publicSchema.addTable(project);
+        publicSchema.addTable(country);
+        publicSchema.addTable(city);
 
         schema = Frameworks
             .createRootSchema(false)
@@ -366,11 +357,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
             // Transformation chain
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -426,11 +417,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             assertNotNull(rel);
 
@@ -442,7 +433,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
             plan.init(ctx);
 
-            RelGraph graph = new RelToGraphConverter().convert((IgniteRel) plan.fragments().get(1).root());
+            RelGraph graph = new RelToGraphConverter().go((IgniteRel) plan.fragments().get(1).root());
 
             convertedBytes = new JdkMarshaller().marshal(graph);
 
@@ -464,6 +455,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     @Test
     public void testSplitterCollocatedPartitionedPartitioned() throws Exception {
+        Object key = new Object();
+
+        developer.identityKey(key);
+        project.identityKey(key);
+
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -504,11 +500,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -565,36 +561,17 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             // Transformation chain
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
-            RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
-
-            assertNotNull(relRoot);
-
-            QueryPlan plan = new Splitter().go((IgniteRel) rel);
-
-            assertNotNull(plan);
-
-            plan.init(ctx);
+            RelTraitSet desired = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE);
 
-            assertNotNull(plan);
-
-            assertTrue(plan.fragments().size() == 2);
-
-            desired = rel.getCluster().traitSetOf(INTERPRETABLE);
-
-            RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, plan.fragments().get(1).root(), desired);
+            RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             assertNotNull(phys);
 
             Map<String, Object> params = ctx.query().params(F.asMap(ContextValue.QUERY_ID.valueName(), new GridCacheVersion()));
 
-            Interpretable.Implementor<Object[]> implementor = new Interpretable.Implementor<>(new DataContextImpl(params, ctx));
+            Implementor<Node<Object[]>> implementor = new ImplementorImpl(new DataContextImpl(params, ctx));
 
-            Node<Object[]> exec = implementor.go(phys.getInput(0));
+            Node<Object[]> exec = implementor.go((IgniteRel) phys);
 
             assertNotNull(exec);
 
@@ -625,12 +602,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TableDistributionService ds = new TableDistributionService(){
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                return IgniteDistributions.broadcast();
-            }
-        };
-
         MappingService ms = new MappingService() {
             @Override public NodesMapping random(AffinityTopologyVersion topVer) {
                 return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -650,7 +621,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
         assertNotNull(ctx);
 
         RelTraitDef[] traitDefs = {
@@ -682,11 +653,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -706,6 +677,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     @Test
     public void testSplitterCollocatedReplicatedAndPartitioned() throws Exception {
+        developer.identityKey(new Object());
+
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -713,15 +686,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TableDistributionService ds = new TableDistributionService(){
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                if (cacheId == CU.cacheId("Project"))
-                    return IgniteDistributions.broadcast();
-
-                return IgniteDistributions.hash(rowType.distributionKeys());
-            }
-        };
-
         MappingService ms = new MappingService() {
             @Override public NodesMapping random(AffinityTopologyVersion topVer) {
                 return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -747,7 +711,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
 
         assertNotNull(ctx);
 
@@ -780,11 +744,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -804,6 +768,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     @Test
     public void testSplitterPartiallyCollocated() throws Exception {
+        developer.identityKey(new Object());
+
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -811,15 +777,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TableDistributionService ds = new TableDistributionService(){
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                if (cacheId == CU.cacheId("Project"))
-                    return IgniteDistributions.broadcast();
-
-                return IgniteDistributions.hash(rowType.distributionKeys());
-            }
-        };
-
         MappingService ms = new MappingService() {
             @Override public NodesMapping random(AffinityTopologyVersion topVer) {
                 return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -845,7 +802,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
 
         assertNotNull(ctx);
 
@@ -878,11 +835,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -909,12 +866,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.ver0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TableDistributionService ds = new TableDistributionService(){
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                return IgniteDistributions.broadcast();
-            }
-        };
-
         MappingService ms = new MappingService() {
             @Override public NodesMapping random(AffinityTopologyVersion topVer) {
                 return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -935,7 +886,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
 
         assertNotNull(ctx);
 
@@ -968,11 +919,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -992,6 +943,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     @Test
     public void testSplitterPartiallyReplicated1() throws Exception {
+        developer.identityKey(new Object());
+
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -999,16 +952,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-
-        TableDistributionService ds = new TableDistributionService(){
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                if (cacheId == CU.cacheId("Project"))
-                    return IgniteDistributions.broadcast();
-
-                return IgniteDistributions.hash(rowType.distributionKeys());
-            }
-        };
-
         MappingService ms = new MappingService() {
             @Override public NodesMapping random(AffinityTopologyVersion topVer) {
                 return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -1034,7 +977,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
 
         assertNotNull(ctx);
 
@@ -1067,11 +1010,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -1091,6 +1034,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     @Test
     public void testSplitterPartiallyReplicated2() throws Exception {
+        developer.identityKey(new Object());
+
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
@@ -1098,16 +1043,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-
-        TableDistributionService ds = new TableDistributionService() {
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                if (cacheId == CU.cacheId("Project"))
-                    return IgniteDistributions.broadcast();
-
-                return IgniteDistributions.hash(rowType.distributionKeys());
-            }
-        };
-
         MappingService ms = new MappingService() {
             @Override public NodesMapping random(AffinityTopologyVersion topVer) {
                 return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
@@ -1133,7 +1068,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms));
 
         assertNotNull(ctx);
 
@@ -1166,11 +1101,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
 
             RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -1230,16 +1165,10 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        TableDistributionService ds = new TableDistributionService() {
-            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-                return IgniteDistributions.hash(rowType.distributionKeys());
-            }
-        };
-
-        return context(c, q, ms, ds);
+        return context(c, q, ms);
     }
 
-    private PlannerContext context(Context parent, Query query, MappingService ms, TableDistributionService ds) {
+    private PlannerContext context(Context parent, Query query, MappingService ms) {
         return PlannerContext.builder()
             .parentContext(parent)
             .logger(log)
@@ -1248,8 +1177,28 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             .query(query)
             .schema(schema)
             .topologyVersion(AffinityTopologyVersion.NONE)
-            .distributionService(ds)
             .mappingService(ms)
             .build();
     }
+
+    public static class TestIgniteTable extends IgniteTable {
+        private final List<Object[]> data;
+        private Object identityKey;
+        public TestIgniteTable(String tableName, String cacheName, RowType rowType, List<Object[]> data) {
+            super(tableName, cacheName, rowType, null);
+            this.data = data;
+        }
+
+        public void identityKey(Object identityKey) {
+            this.identityKey = identityKey;
+        }
+
+        @Override public Object identityKey() {
+            return identityKey;
+        }
+
+        @Override public Enumerable<Object[]> scan(DataContext root) {
+            return Linq4j.asEnumerable(data);
+        }
+    }
 }
\ No newline at end of file