You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/06/09 09:14:43 UTC

[ignite-3] branch main updated: IGNITE-14834 Move mapping and splitter related code to Ignite 3.0 (#166)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 67e8ede  IGNITE-14834 Move mapping and splitter related code to Ignite 3.0 (#166)
67e8ede is described below

commit 67e8ede31e812b38feed1838db34bf9e18328cf1
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Wed Jun 9 12:14:37 2021 +0300

    IGNITE-14834 Move mapping and splitter related code to Ignite 3.0 (#166)
---
 modules/calcite/pom.xml                            |    5 +
 .../internal/processors/query/calcite/Stubs.java   |    5 +
 .../{Stubs.java => externalize/RelInputEx.java}    |   30 +-
 .../query/calcite/externalize/RelJson.java         |  918 ++++++++++
 .../query/calcite/externalize/RelJsonReader.java   |  330 ++++
 .../query/calcite/externalize/RelJsonWriter.java   |  168 ++
 .../query/calcite/metadata/ColocationGroup.java    |  230 +++
 .../ColocationMappingException.java}               |   25 +-
 .../query/calcite/metadata/FragmentMapping.java    |  147 ++
 .../calcite/metadata/FragmentMappingException.java |   58 +
 .../calcite/metadata/IgniteMdFragmentMapping.java  |  226 +++
 .../query/calcite/metadata/IgniteMetadata.java     |   24 +
 .../query/calcite/metadata/MappingService.java     |   41 +
 .../NodeMappingException.java}                     |   35 +-
 .../query/calcite/metadata/RelMetadataQueryEx.java |   28 +
 .../calcite/prepare/AbstractMultiStepPlan.java     |  101 ++
 .../processors/query/calcite/prepare/Cloner.java   |  257 +++
 .../{Stubs.java => prepare/ExecutionPlan.java}     |   32 +-
 .../processors/query/calcite/prepare/Fragment.java |  193 ++
 .../query/calcite/prepare/FragmentSplitter.java    |  144 ++
 .../{Stubs.java => prepare/IdGenerator.java}       |   21 +-
 .../query/calcite/prepare/MultiStepPlan.java       |   54 +
 .../MultiStepQueryPlan.java}                       |   29 +-
 .../query/calcite/prepare/PlanningContext.java     |   31 +-
 .../calcite/{Stubs.java => prepare/QueryPlan.java} |   31 +-
 .../query/calcite/prepare/QueryTemplate.java       |  136 ++
 .../processors/query/calcite/prepare/Splitter.java |  125 ++
 .../query/calcite/rel/AbstractIgniteSpool.java     |   47 +
 .../query/calcite/rel/IgniteHashIndexSpool.java    |    4 +-
 .../query/calcite/rel/IgniteMergeJoin.java         |    5 +-
 .../query/calcite/rel/IgniteSortedIndexSpool.java  |    4 +-
 .../query/calcite/rel/IgniteTableFunctionScan.java |   11 +
 .../query/calcite/rel/IgniteTableModify.java       |    2 +-
 .../query/calcite/rel/IgniteTableSpool.java        |    5 +-
 .../query/calcite/schema/IgniteTable.java          |   10 +
 .../query/calcite/util/IgniteMethod.java           |   50 +
 .../query/calcite/planner/AbstractPlannerTest.java |   96 +
 .../query/calcite/planner/PlannerTest.java         | 1847 ++++++++++++++++++++
 .../apache/ignite/internal/util/ArrayUtils.java    |   56 +
 .../ignite/internal/util/IgniteIntIterator.java}   |   28 +-
 .../apache/ignite/internal/util/IgniteIntList.java |  526 ++++++
 .../apache/ignite/internal/util/IgniteUtils.java   |   82 +
 .../internal/testframework/IgniteTestUtils.java    |  116 ++
 43 files changed, 6194 insertions(+), 119 deletions(-)

diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index 8bce051..c44ae05 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -46,6 +46,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-network-api</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
         </dependency>
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
index 01746a7..ce7d926 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
@@ -24,6 +24,11 @@ public class Stubs {
     }
 
     /** */
+    public static long longFoo(Object... args) {
+        return args == null ? 0 : args.length;
+    }
+
+    /** */
     public static boolean boolFoo(Object... args) {
         return args == null;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelInputEx.java
similarity index 56%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelInputEx.java
index 01746a7..221809c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelInputEx.java
@@ -2,11 +2,11 @@
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
+ * The ASF licenses this file to you under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,22 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.externalize;
 
-/** Stubs */
-public class Stubs {
-    /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
-    }
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
 
-    /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
-    }
-
-    /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
-    }
+/** */
+public interface RelInputEx extends RelInput {
+    /**
+     * @param tag Tag.
+     * @return A collation value.
+     */
+    RelCollation getCollation(String tag);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
new file mode 100644
index 0000000..f8a5b7f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -0,0 +1,918 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.externalize;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl.JavaType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexSlot;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVariable;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsertKeyword;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+import org.apache.calcite.sql.SqlJsonQueryWrapperBehavior;
+import org.apache.calcite.sql.SqlJsonValueEmptyOrErrorBehavior;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.IgniteUtils.igniteClassLoader;
+
+/**
+ * Utilities for converting {@link RelNode} into JSON format.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+class RelJson {
+    /** */
+    private final RelOptCluster cluster;
+
+    /** */
+    @SuppressWarnings("PublicInnerClass") @FunctionalInterface
+    public static interface RelFactory extends Function<RelInput, RelNode> {
+        /** {@inheritDoc} */
+        @Override RelNode apply(RelInput input);
+    }
+
+    /** */
+    private static final LoadingCache<String, RelFactory> FACTORIES_CACHE = CacheBuilder.newBuilder()
+        .build(CacheLoader.from(RelJson::relFactory));
+
+    /** */
+    private static RelFactory relFactory(String typeName) {
+        Class<?> clazz = null;
+
+        if (!typeName.contains(".")) {
+            for (String package_ : PACKAGES) {
+                if ((clazz = classForName(package_ + typeName, true)) != null)
+                    break;
+            }
+        }
+
+        if (clazz == null)
+            clazz = classForName(typeName, false);
+
+        assert RelNode.class.isAssignableFrom(clazz);
+
+        Constructor<RelNode> constructor;
+
+        try {
+            constructor = (Constructor<RelNode>)clazz.getConstructor(RelInput.class);
+        }
+        catch (NoSuchMethodException e) {
+            throw new IgniteException("class does not have required constructor, "
+                + clazz + "(RelInput)");
+        }
+
+        BlockBuilder builder = new BlockBuilder();
+        ParameterExpression input_ = Expressions.parameter(RelInput.class);
+        builder.add(Expressions.new_(constructor, input_));
+        MethodDeclaration declaration = Expressions.methodDecl(
+            Modifier.PUBLIC, RelNode.class, "apply", asList(input_), builder.toBlock());
+        return Commons.compile(RelFactory.class, Expressions.toString(asList(declaration), "\n", true));
+    }
+
+    /** */
+    private static final ImmutableMap<String, Enum<?>> ENUM_BY_NAME;
+
+    /** */
+    static {
+        // Build a mapping from enum constants (e.g. LEADING) to the enum
+        // that contains them (e.g. SqlTrimFunction.Flag). If there two
+        // enum constants have the same name, the builder will throw.
+        final ImmutableMap.Builder<String, Enum<?>> enumByName =
+            ImmutableMap.builder();
+
+        register(enumByName, JoinConditionType.class);
+        register(enumByName, JoinType.class);
+        register(enumByName, Direction.class);
+        register(enumByName, NullDirection.class);
+        register(enumByName, SqlTypeName.class);
+        register(enumByName, SqlKind.class);
+        register(enumByName, SqlSyntax.class);
+        register(enumByName, SqlExplain.Depth.class);
+        register(enumByName, SqlExplainFormat.class);
+        register(enumByName, SqlExplainLevel.class);
+        register(enumByName, SqlInsertKeyword.class);
+        register(enumByName, SqlJsonConstructorNullClause.class);
+        register(enumByName, SqlJsonQueryWrapperBehavior.class);
+        register(enumByName, SqlJsonValueEmptyOrErrorBehavior.class);
+        register(enumByName, SqlMatchRecognize.AfterOption.class);
+        register(enumByName, SqlSelectKeyword.class);
+        register(enumByName, SqlTrimFunction.Flag.class);
+        register(enumByName, TimeUnitRange.class);
+
+        ENUM_BY_NAME = enumByName.build();
+    }
+
+    /** */
+    private static void register(ImmutableMap.Builder<String, Enum<?>> builder, Class<? extends Enum> aClass) {
+        String preffix = aClass.getSimpleName() + "#";
+        for (Enum enumConstant : aClass.getEnumConstants())
+            builder.put(preffix + enumConstant.name(), enumConstant);
+    }
+
+    /** */
+    private static Class<?> classForName(String typeName, boolean skipNotFound) {
+        try {
+            return IgniteUtils.forName(typeName, igniteClassLoader());
+        }
+        catch (ClassNotFoundException e) {
+            if (!skipNotFound)
+                throw new IgniteException("unknown type " + typeName);
+        }
+
+        return null;
+    }
+
+    /** */
+    private static final List<String> PACKAGES =
+        ImmutableList.of(
+            "org.apache.ignite.internal.processors.query.calcite.rel.",
+            "org.apache.ignite.internal.processors.query.calcite.rel.agg.",
+            "org.apache.ignite.internal.processors.query.calcite.rel.set.",
+            "org.apache.calcite.rel.",
+            "org.apache.calcite.rel.core.",
+            "org.apache.calcite.rel.logical.",
+            "org.apache.calcite.adapter.jdbc.",
+            "org.apache.calcite.adapter.jdbc.JdbcRules$");
+
+    /** */
+    RelJson(RelOptCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    /** */
+    Function<RelInput, RelNode> factory(String type) {
+        return FACTORIES_CACHE.getUnchecked(type);
+    }
+
+    /** */
+    String classToTypeName(Class<? extends RelNode> class_) {
+        if (IgniteRel.class.isAssignableFrom(class_))
+            return class_.getSimpleName();
+
+        String canonicalName = class_.getName();
+        for (String package_ : PACKAGES) {
+            if (canonicalName.startsWith(package_)) {
+                String remaining = canonicalName.substring(package_.length());
+                if (remaining.indexOf('.') < 0 && remaining.indexOf('$') < 0)
+                    return remaining;
+            }
+        }
+        return canonicalName;
+    }
+
+    /** */
+    Object toJson(Object value) {
+        if (value == null
+            || value instanceof Number
+            || value instanceof String
+            || value instanceof Boolean)
+            return value;
+        else if (value instanceof Enum)
+            return toJson((Enum)value);
+        else if (value instanceof RexNode)
+            return toJson((RexNode)value);
+        else if (value instanceof RexWindow)
+            return toJson((RexWindow)value);
+        else if (value instanceof RexFieldCollation)
+            return toJson((RexFieldCollation)value);
+        else if (value instanceof RexWindowBound)
+            return toJson((RexWindowBound)value);
+        else if (value instanceof CorrelationId)
+            return toJson((CorrelationId)value);
+        else if (value instanceof List) {
+            List<Object> list = list();
+            for (Object o : (Iterable)value)
+                list.add(toJson(o));
+            return list;
+        }
+        else if (value instanceof ImmutableBitSet) {
+            List<Object> list = list();
+            for (Integer integer : (Iterable<Integer>)value)
+                list.add(toJson(integer));
+            return list;
+        }
+        else if (value instanceof Set) {
+            Set<Object> set = set();
+            for (Object o : (Iterable)value)
+                set.add(toJson(o));
+            return set;
+        }
+        else if (value instanceof DistributionTrait)
+            return toJson((DistributionTrait)value);
+        else if (value instanceof AggregateCall)
+            return toJson((AggregateCall)value);
+        else if (value instanceof RelCollationImpl)
+            return toJson((RelCollationImpl)value);
+        else if (value instanceof RelDataType)
+            return toJson((RelDataType)value);
+        else if (value instanceof RelDataTypeField)
+            return toJson((RelDataTypeField)value);
+        else
+            throw new UnsupportedOperationException("type not serializable: "
+                + value + " (type " + value.getClass().getCanonicalName() + ")");
+    }
+
+    /** */
+    RelCollation toCollation(List<Map<String, Object>> jsonFieldCollations) {
+        if (jsonFieldCollations == null)
+            return RelCollations.EMPTY;
+
+        List<RelFieldCollation> fieldCollations = jsonFieldCollations.stream()
+            .map(this::toFieldCollation)
+            .collect(Collectors.toList());
+
+        return RelCollations.of(fieldCollations);
+    }
+
+    /** */
+    IgniteDistribution toDistribution(Object distribution) {
+        if (distribution instanceof String) {
+            switch ((String)distribution) {
+                case "single":
+                    return IgniteDistributions.single();
+                case "any":
+                    return IgniteDistributions.any();
+                case "broadcast":
+                    return IgniteDistributions.broadcast();
+                case "random":
+                    return IgniteDistributions.random();
+            }
+        }
+
+        Map<String, Object> map = (Map<String, Object>)distribution;
+        Number cacheId = (Number)map.get("cacheId");
+        if (cacheId != null)
+            return IgniteDistributions.hash((List<Integer>)map.get("keys"),
+                DistributionFunction.affinity(cacheId.intValue(), cacheId));
+
+        return IgniteDistributions.hash((List<Integer>)map.get("keys"), DistributionFunction.hash());
+    }
+
+    /** */
+    RelDataType toType(RelDataTypeFactory typeFactory, Object o) {
+        if (o instanceof List) {
+            List<Map<String, Object>> jsonList = (List<Map<String, Object>>)o;
+            RelDataTypeFactory.Builder builder = typeFactory.builder();
+            for (Map<String, Object> jsonMap : jsonList)
+                builder.add((String)jsonMap.get("name"), toType(typeFactory, jsonMap));
+            return builder.build();
+        }
+        else if (o instanceof Map) {
+            Map<String, Object> map = (Map<String, Object>)o;
+            String clazz = (String)map.get("class");
+
+            if (clazz != null) {
+                RelDataType type = typeFactory.createJavaType(classForName(clazz, false));
+
+                if (Boolean.TRUE == map.get("nullable"))
+                    type = typeFactory.createTypeWithNullability(type, true);
+
+                return type;
+            }
+
+            Object fields = map.get("fields");
+
+            if (fields != null)
+                return toType(typeFactory, fields);
+            else {
+                SqlTypeName sqlTypeName = toEnum(map.get("type"));
+                Integer precision = (Integer)map.get("precision");
+                Integer scale = (Integer)map.get("scale");
+                if (SqlTypeName.INTERVAL_TYPES.contains(sqlTypeName)) {
+                    TimeUnit startUnit = sqlTypeName.getStartUnit();
+                    TimeUnit endUnit = sqlTypeName.getEndUnit();
+                    return typeFactory.createSqlIntervalType(
+                        new SqlIntervalQualifier(startUnit, endUnit, SqlParserPos.ZERO));
+                }
+                else if (sqlTypeName == SqlTypeName.ARRAY)
+                    return typeFactory.createArrayType(toType(typeFactory, map.get("elementType")), -1);
+                RelDataType type;
+                if (precision == null)
+                    type = typeFactory.createSqlType(sqlTypeName);
+                else if (scale == null)
+                    type = typeFactory.createSqlType(sqlTypeName, precision);
+                else
+                    type = typeFactory.createSqlType(sqlTypeName, precision, scale);
+
+                if (Boolean.TRUE == map.get("nullable"))
+                    type = typeFactory.createTypeWithNullability(type, true);
+
+                return type;
+            }
+        }
+        else {
+            SqlTypeName sqlTypeName = toEnum(o);
+            return typeFactory.createSqlType(sqlTypeName);
+        }
+    }
+
+    /** */
+    RexNode toRex(RelInput relInput, Object o) {
+        RelOptCluster cluster = relInput.getCluster();
+        RexBuilder rexBuilder = cluster.getRexBuilder();
+        if (o == null)
+            return null;
+        else if (o instanceof Map) {
+            Map map = (Map)o;
+            Map<String, Object> opMap = (Map)map.get("op");
+            IgniteTypeFactory typeFactory = Commons.typeFactory(cluster);
+            if (opMap != null) {
+                if (map.containsKey("class"))
+                    opMap.put("class", map.get("class"));
+                List operands = (List)map.get("operands");
+                List<RexNode> rexOperands = toRexList(relInput, operands);
+                Object jsonType = map.get("type");
+                Map window = (Map)map.get("window");
+                if (window != null) {
+                    SqlAggFunction operator = (SqlAggFunction)toOp(opMap);
+                    RelDataType type = toType(typeFactory, jsonType);
+                    List<RexNode> partitionKeys = new ArrayList<>();
+                    if (window.containsKey("partition"))
+                        partitionKeys = toRexList(relInput, (List)window.get("partition"));
+                    List<RexFieldCollation> orderKeys = new ArrayList<>();
+                    if (window.containsKey("order"))
+                        orderKeys = toRexFieldCollationList(relInput, (List)window.get("order"));
+                    RexWindowBound lowerBound;
+                    RexWindowBound upperBound;
+                    boolean physical;
+                    if (window.get("rows-lower") != null) {
+                        lowerBound = toRexWindowBound(relInput, (Map)window.get("rows-lower"));
+                        upperBound = toRexWindowBound(relInput, (Map)window.get("rows-upper"));
+                        physical = true;
+                    }
+                    else if (window.get("range-lower") != null) {
+                        lowerBound = toRexWindowBound(relInput, (Map)window.get("range-lower"));
+                        upperBound = toRexWindowBound(relInput, (Map)window.get("range-upper"));
+                        physical = false;
+                    }
+                    else {
+                        // No ROWS or RANGE clause
+                        lowerBound = null;
+                        upperBound = null;
+                        physical = false;
+                    }
+                    boolean distinct = (Boolean)map.get("distinct");
+                    return rexBuilder.makeOver(type, operator, rexOperands, partitionKeys,
+                        ImmutableList.copyOf(orderKeys), lowerBound, upperBound, physical,
+                        true, false, distinct, false);
+                }
+                else {
+                    SqlOperator operator = toOp(opMap);
+                    RelDataType type;
+                    if (jsonType != null)
+                        type = toType(typeFactory, jsonType);
+                    else
+                        type = rexBuilder.deriveReturnType(operator, rexOperands);
+                    return rexBuilder.makeCall(type, operator, rexOperands);
+                }
+            }
+            Integer input = (Integer)map.get("input");
+            if (input != null) {
+                // Check if it is a local ref.
+                if (map.containsKey("type")) {
+                    RelDataType type = toType(typeFactory, map.get("type"));
+                    return map.get("dynamic") == Boolean.TRUE
+                        ? rexBuilder.makeDynamicParam(type, input)
+                        : rexBuilder.makeLocalRef(type, input);
+                }
+
+                List<RelNode> inputNodes = relInput.getInputs();
+                int i = input;
+                for (RelNode inputNode : inputNodes) {
+                    RelDataType rowType = inputNode.getRowType();
+                    if (i < rowType.getFieldCount()) {
+                        RelDataTypeField field = rowType.getFieldList().get(i);
+                        return rexBuilder.makeInputRef(field.getType(), input);
+                    }
+                    i -= rowType.getFieldCount();
+                }
+                throw new RuntimeException("input field " + input + " is out of range");
+            }
+
+            String field = (String)map.get("field");
+            if (field != null) {
+                Object jsonExpr = map.get("expr");
+                RexNode expr = toRex(relInput, jsonExpr);
+                return rexBuilder.makeFieldAccess(expr, field, true);
+            }
+
+            String correl = (String)map.get("correl");
+            if (correl != null) {
+                RelDataType type = toType(typeFactory, map.get("type"));
+                return rexBuilder.makeCorrel(type, new CorrelationId(correl));
+            }
+
+            if (map.containsKey("literal")) {
+                Object literal = map.get("literal");
+                RelDataType type = toType(typeFactory, map.get("type"));
+
+                if (literal == null)
+                    return rexBuilder.makeNullLiteral(type);
+
+                if (type.getSqlTypeName() == SqlTypeName.SYMBOL)
+                    literal = toEnum(literal);
+
+                return rexBuilder.makeLiteral(literal, type, false);
+            }
+
+            throw new UnsupportedOperationException("cannot convert to rex " + o);
+        }
+        else if (o instanceof Boolean)
+            return rexBuilder.makeLiteral((Boolean)o);
+        else if (o instanceof String)
+            return rexBuilder.makeLiteral((String)o);
+        else if (o instanceof Number) {
+            Number number = (Number)o;
+            if (number instanceof Double || number instanceof Float)
+                return rexBuilder.makeApproxLiteral(
+                    BigDecimal.valueOf(number.doubleValue()));
+            else
+                return rexBuilder.makeExactLiteral(
+                    BigDecimal.valueOf(number.longValue()));
+        }
+        else
+            throw new UnsupportedOperationException("cannot convert to rex " + o);
+    }
+
+    /** */
+    SqlOperator toOp(Map<String, Object> map) {
+        // in case different operator has the same kind, check with both name and kind.
+        String name = map.get("name").toString();
+        SqlKind sqlKind = toEnum(map.get("kind"));
+        SqlSyntax sqlSyntax = toEnum(map.get("syntax"));
+        List<SqlOperator> operators = new ArrayList<>();
+
+        FRAMEWORK_CONFIG.getOperatorTable().lookupOperatorOverloads(
+            new SqlIdentifier(name, new SqlParserPos(0, 0)),
+            null,
+            sqlSyntax,
+            operators,
+            SqlNameMatchers.liberal()
+        );
+
+        for (SqlOperator operator : operators)
+            if (operator.kind == sqlKind)
+                return operator;
+        String class_ = (String)map.get("class");
+        if (class_ != null)
+            return AvaticaUtils.instantiatePlugin(SqlOperator.class, class_);
+        return null;
+    }
+
+    /** */
+    <T> List<T> list() {
+        return new ArrayList<>();
+    }
+
+    /** */
+    <T> Set<T> set() {
+        return new LinkedHashSet<>();
+    }
+
+    /** */
+    <T> Map<String, T> map() {
+        return new LinkedHashMap<>();
+    }
+
+    /** */
+    private <T extends Enum<T>> T toEnum(Object o) {
+        if (o instanceof Map) {
+            Map<String, Object> map = (Map<String, Object>)o;
+            String class_ = (String)map.get("class");
+            String name = map.get("name").toString();
+            return Util.enumVal((Class<T>)classForName(class_, false), name);
+        }
+
+        assert o instanceof String && ENUM_BY_NAME.containsKey(o);
+
+        String name = (String)o;
+        return (T)ENUM_BY_NAME.get(name);
+    }
+
+    /** */
+    private RelFieldCollation toFieldCollation(Map<String, Object> map) {
+        Integer field = (Integer)map.get("field");
+        Direction direction = toEnum(map.get("direction"));
+        NullDirection nullDirection = toEnum(map.get("nulls"));
+        return new RelFieldCollation(field, direction, nullDirection);
+    }
+
+    /** */
+    private List<RexFieldCollation> toRexFieldCollationList(RelInput relInput, List<Map<String, Object>> order) {
+        if (order == null)
+            return null;
+
+        List<RexFieldCollation> list = new ArrayList<>();
+        for (Map<String, Object> o : order) {
+            RexNode expr = toRex(relInput, o.get("expr"));
+            Set<SqlKind> directions = new HashSet<>();
+            if (toEnum(o.get("direction")) == Direction.DESCENDING)
+                directions.add(SqlKind.DESCENDING);
+            if (toEnum(o.get("null-direction")) == NullDirection.FIRST)
+                directions.add(SqlKind.NULLS_FIRST);
+            else
+                directions.add(SqlKind.NULLS_LAST);
+            list.add(new RexFieldCollation(expr, directions));
+        }
+        return list;
+    }
+
+    /** */
+    private RexWindowBound toRexWindowBound(RelInput input, Map<String, Object> map) {
+        if (map == null)
+            return null;
+
+        String type = (String)map.get("type");
+        switch (type) {
+            case "CURRENT_ROW":
+                return RexWindowBounds.create(
+                    SqlWindow.createCurrentRow(SqlParserPos.ZERO), null);
+            case "UNBOUNDED_PRECEDING":
+                return RexWindowBounds.create(
+                    SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO), null);
+            case "UNBOUNDED_FOLLOWING":
+                return RexWindowBounds.create(
+                    SqlWindow.createUnboundedFollowing(SqlParserPos.ZERO), null);
+            case "PRECEDING":
+                RexNode precedingOffset = toRex(input, map.get("offset"));
+                return RexWindowBounds.create(null,
+                    input.getCluster().getRexBuilder().makeCall(
+                        SqlWindow.PRECEDING_OPERATOR, precedingOffset));
+            case "FOLLOWING":
+                RexNode followingOffset = toRex(input, map.get("offset"));
+                return RexWindowBounds.create(null,
+                    input.getCluster().getRexBuilder().makeCall(
+                        SqlWindow.FOLLOWING_OPERATOR, followingOffset));
+            default:
+                throw new UnsupportedOperationException("cannot convert type to rex window bound " + type);
+        }
+    }
+
+    /** */
+    private List<RexNode> toRexList(RelInput relInput, List<?> operands) {
+        List<RexNode> list = new ArrayList<>();
+        for (Object operand : operands)
+            list.add(toRex(relInput, operand));
+        return list;
+    }
+
+    /** */
+    private Object toJson(Enum<?> enum0) {
+        String key = enum0.getDeclaringClass().getSimpleName() + "#" + enum0.name();
+
+        if (ENUM_BY_NAME.get(key) == enum0)
+            return key;
+
+        Map<String, Object> map = map();
+        map.put("class", enum0.getDeclaringClass().getName());
+        map.put("name", enum0.name());
+        return map;
+    }
+
+    /** */
+    private Object toJson(AggregateCall node) {
+        Map<String, Object> map = map();
+        map.put("agg", toJson(node.getAggregation()));
+        map.put("type", toJson(node.getType()));
+        map.put("distinct", node.isDistinct());
+        map.put("operands", node.getArgList());
+        map.put("filter", node.filterArg);
+        map.put("name", node.getName());
+        return map;
+    }
+
+    /** */
+    private Object toJson(RelDataType node) {
+        if (node instanceof JavaType) {
+            Map<String, Object> map = map();
+            map.put("class", ((JavaType)node).getJavaClass().getName());
+            if (node.isNullable())
+                map.put("nullable", true);
+
+            return map;
+        }
+        if (node.isStruct()) {
+            List<Object> list = list();
+            for (RelDataTypeField field : node.getFieldList())
+                list.add(toJson(field));
+            return list;
+        }
+        else if (node.getSqlTypeName() == SqlTypeName.ARRAY) {
+            Map<String, Object> map = map();
+            map.put("type", toJson(node.getSqlTypeName()));
+            map.put("elementType", toJson(node.getComponentType()));
+            return map;
+        }
+        else {
+            Map<String, Object> map = map();
+            map.put("type", toJson(node.getSqlTypeName()));
+            if (node.isNullable())
+                map.put("nullable", true);
+            if (node.getSqlTypeName().allowsPrec())
+                map.put("precision", node.getPrecision());
+            if (node.getSqlTypeName().allowsScale())
+                map.put("scale", node.getScale());
+            return map;
+        }
+    }
+
+    /** */
+    private Object toJson(RelDataTypeField node) {
+        Map<String, Object> map;
+        if (node.getType().isStruct()) {
+            map = map();
+            map.put("fields", toJson(node.getType()));
+        }
+        else
+            map = (Map<String, Object>)toJson(node.getType());
+        map.put("name", node.getName());
+        return map;
+    }
+
+    /** */
+    private Object toJson(CorrelationId node) {
+        return node.getId();
+    }
+
+    /** */
+    private Object toJson(RexNode node) {
+        // removes calls to SEARCH and the included Sarg and converts them to comparisons
+        node = RexUtil.expandSearch(cluster.getRexBuilder(), null, node);
+
+        Map<String, Object> map;
+        switch (node.getKind()) {
+            case FIELD_ACCESS:
+                map = map();
+                RexFieldAccess fieldAccess = (RexFieldAccess)node;
+                map.put("field", fieldAccess.getField().getName());
+                map.put("expr", toJson(fieldAccess.getReferenceExpr()));
+
+                return map;
+            case LITERAL:
+                RexLiteral literal = (RexLiteral)node;
+                Object value = literal.getValue3();
+                map = map();
+                map.put("literal", toJson(value));
+                map.put("type", toJson(node.getType()));
+
+                return map;
+            case INPUT_REF:
+                map = map();
+                map.put("input", ((RexSlot)node).getIndex());
+                map.put("name", ((RexVariable)node).getName());
+
+                return map;
+            case DYNAMIC_PARAM:
+                map = map();
+                map.put("input", ((RexDynamicParam)node).getIndex());
+                map.put("name", ((RexVariable)node).getName());
+                map.put("type", toJson(node.getType()));
+                map.put("dynamic", true);
+
+                return map;
+            case LOCAL_REF:
+                map = map();
+                map.put("input", ((RexSlot)node).getIndex());
+                map.put("name", ((RexVariable)node).getName());
+                map.put("type", toJson(node.getType()));
+
+                return map;
+            case CORREL_VARIABLE:
+                map = map();
+                map.put("correl", ((RexVariable)node).getName());
+                map.put("type", toJson(node.getType()));
+
+                return map;
+            default:
+                if (node instanceof RexCall) {
+                    RexCall call = (RexCall)node;
+                    map = map();
+                    map.put("op", toJson(call.getOperator()));
+                    List<Object> list = list();
+
+                    for (RexNode operand : call.getOperands())
+                        list.add(toJson(operand));
+
+                    map.put("operands", list);
+
+                    if (node.getKind() == SqlKind.CAST)
+                        map.put("type", toJson(node.getType()));
+
+                    if (call.getOperator() instanceof SqlFunction)
+                        if (((SqlFunction)call.getOperator()).getFunctionType().isUserDefined()) {
+                            SqlOperator op = call.getOperator();
+                            map.put("class", op.getClass().getName());
+                            map.put("type", toJson(node.getType()));
+                            map.put("deterministic", op.isDeterministic());
+                            map.put("dynamic", op.isDynamicFunction());
+                        }
+
+                    if (call instanceof RexOver) {
+                        RexOver over = (RexOver)call;
+                        map.put("distinct", over.isDistinct());
+                        map.put("type", toJson(node.getType()));
+                        map.put("window", toJson(over.getWindow()));
+                    }
+
+                    return map;
+                }
+                throw new UnsupportedOperationException("unknown rex " + node);
+        }
+    }
+
+    /** */
+    private Object toJson(RexWindow window) {
+        Map<String, Object> map = map();
+        if (!window.partitionKeys.isEmpty())
+            map.put("partition", toJson(window.partitionKeys));
+        if (!window.orderKeys.isEmpty())
+            map.put("order", toJson(window.orderKeys));
+        if (window.getLowerBound() == null) {
+            // No ROWS or RANGE clause
+        }
+        else if (window.getUpperBound() == null)
+            if (window.isRows())
+                map.put("rows-lower", toJson(window.getLowerBound()));
+            else
+                map.put("range-lower", toJson(window.getLowerBound()));
+        else if (window.isRows()) {
+            map.put("rows-lower", toJson(window.getLowerBound()));
+            map.put("rows-upper", toJson(window.getUpperBound()));
+        }
+        else {
+            map.put("range-lower", toJson(window.getLowerBound()));
+            map.put("range-upper", toJson(window.getUpperBound()));
+        }
+        return map;
+    }
+
+    /** */
+    private Object toJson(DistributionTrait distribution) {
+        Type type = distribution.getType();
+
+        switch (type) {
+            case ANY:
+            case BROADCAST_DISTRIBUTED:
+            case RANDOM_DISTRIBUTED:
+            case SINGLETON:
+
+                return type.shortName;
+            case HASH_DISTRIBUTED:
+                Map<String, Object> map = map();
+                List<Object> keys = list();
+                for (Integer key : distribution.getKeys())
+                    keys.add(toJson(key));
+
+                map.put("keys", keys);
+
+                DistributionFunction function = distribution.function();
+
+                if (function.affinity())
+                    map.put("cacheId", function.cacheId());
+
+                return map;
+            default:
+                throw new AssertionError("Unexpected distribution type.");
+        }
+    }
+
+    /** */
+    private Object toJson(RelCollationImpl node) {
+        List<Object> list = list();
+        for (RelFieldCollation fieldCollation : node.getFieldCollations()) {
+            Map<String, Object> map = map();
+            map.put("field", fieldCollation.getFieldIndex());
+            map.put("direction", toJson(fieldCollation.getDirection()));
+            map.put("nulls", toJson(fieldCollation.nullDirection));
+            list.add(map);
+        }
+        return list;
+    }
+
+    /** */
+    private Object toJson(RexFieldCollation collation) {
+        Map<String, Object> map = map();
+        map.put("expr", toJson(collation.left));
+        map.put("direction", toJson(collation.getDirection()));
+        map.put("null-direction", toJson(collation.getNullDirection()));
+        return map;
+    }
+
+    /** */
+    private Object toJson(RexWindowBound windowBound) {
+        Map<String, Object> map = map();
+        if (windowBound.isCurrentRow())
+            map.put("type", "CURRENT_ROW");
+        else if (windowBound.isUnbounded())
+            map.put("type", windowBound.isPreceding() ? "UNBOUNDED_PRECEDING" : "UNBOUNDED_FOLLOWING");
+        else {
+            map.put("type", windowBound.isPreceding() ? "PRECEDING" : "FOLLOWING");
+            map.put("offset", toJson(windowBound.getOffset()));
+        }
+        return map;
+    }
+
+    /** */
+    private Object toJson(SqlOperator operator) {
+        // User-defined operators are not yet handled.
+        Map map = map();
+        map.put("name", operator.getName());
+        map.put("kind", toJson(operator.kind));
+        map.put("syntax", toJson(operator.getSyntax()));
+        return map;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
new file mode 100644
index 0000000..0d0a9a3
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.externalize;
+
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.lang.IgniteException;
+
+/** */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class RelJsonReader {
+    /** */
+    private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF = new TypeReference<>() {};
+
+    /** */
+    private final ObjectMapper mapper = new ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+
+    /** */
+    private final RelOptCluster cluster;
+
+    /** */
+    private final RelOptSchema relOptSchema;
+
+    /** */
+    private final RelJson relJson;
+
+    /** */
+    private final Map<String, RelNode> relMap = new LinkedHashMap<>();
+
+    /** */
+    private RelNode lastRel;
+
+    /** */
+    public static <T extends RelNode> T fromJson(PlanningContext ctx, String json) {
+        RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+
+        return (T)reader.read(json);
+    }
+
+    /** */
+    public RelJsonReader(RelOptCluster cluster, RelOptSchema relOptSchema) {
+        this.cluster = cluster;
+        this.relOptSchema = relOptSchema;
+
+        relJson = new RelJson(cluster);
+    }
+
+    /** */
+    public RelNode read(String s) {
+        try {
+            lastRel = null;
+            Map<String, Object> o = mapper.readValue(s, TYPE_REF);
+            List<Map<String, Object>> rels = (List)o.get("rels");
+            readRels(rels);
+            return lastRel;
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void readRels(List<Map<String, Object>> jsonRels) {
+        for (Map<String, Object> jsonRel : jsonRels)
+            readRel(jsonRel);
+    }
+
+    /** */
+    private void readRel(Map<String, Object> jsonRel) {
+        String id = (String)jsonRel.get("id");
+        String type = (String)jsonRel.get("relOp");
+        Function<RelInput, RelNode> factory = relJson.factory(type);
+        RelNode rel = factory.apply(new RelInputImpl(jsonRel));
+        relMap.put(id, rel);
+        lastRel = rel;
+    }
+
+    /** */
+    private class RelInputImpl implements RelInputEx {
+        /** */
+        private final Map<String, Object> jsonRel;
+
+        /** */
+        private RelInputImpl(Map<String, Object> jsonRel) {
+            this.jsonRel = jsonRel;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelOptCluster getCluster() {
+            return cluster;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelTraitSet getTraitSet() {
+            return cluster.traitSet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelOptTable getTable(String table) {
+            List<String> list = getStringList(table);
+            return relOptSchema.getTableForMember(list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelNode getInput() {
+            List<RelNode> inputs = getInputs();
+            assert inputs.size() == 1;
+            return inputs.get(0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelNode> getInputs() {
+            List<String> jsonInputs = getStringList("inputs");
+            if (jsonInputs == null)
+                return ImmutableList.of(lastRel);
+            List<RelNode> inputs = new ArrayList<>();
+            for (String jsonInput : jsonInputs)
+                inputs.add(lookupInput(jsonInput));
+            return inputs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RexNode getExpression(String tag) {
+            return relJson.toRex(this, jsonRel.get(tag));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ImmutableBitSet getBitSet(String tag) {
+            return ImmutableBitSet.of(getIntegerList(tag));
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<ImmutableBitSet> getBitSetList(String tag) {
+            List<List<Integer>> list = getIntegerListList(tag);
+            if (list == null)
+                return null;
+            ImmutableList.Builder<ImmutableBitSet> builder =
+                ImmutableList.builder();
+            for (List<Integer> integers : list)
+                builder.add(ImmutableBitSet.of(integers));
+            return builder.build();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<String> getStringList(String tag) {
+            return (List<String>)jsonRel.get(tag);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<Integer> getIntegerList(String tag) {
+            return (List<Integer>)jsonRel.get(tag);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<Integer>> getIntegerListList(String tag) {
+            return (List<List<Integer>>)jsonRel.get(tag);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<AggregateCall> getAggregateCalls(String tag) {
+            List<Map<String, Object>> jsonAggs = (List)jsonRel.get(tag);
+            List<AggregateCall> inputs = new ArrayList<>();
+            for (Map<String, Object> jsonAggCall : jsonAggs)
+                inputs.add(toAggCall(jsonAggCall));
+            return inputs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object get(String tag) {
+            return jsonRel.get(tag);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getString(String tag) {
+            return (String)jsonRel.get(tag);
+        }
+
+        /** {@inheritDoc} */
+        @Override public float getFloat(String tag) {
+            return ((Number)jsonRel.get(tag)).floatValue();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean getBoolean(String tag, boolean default_) {
+            Boolean b = (Boolean)jsonRel.get(tag);
+            return b != null ? b : default_;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <E extends Enum<E>> E getEnum(String tag, Class<E> enumClass) {
+            return Util.enumVal(enumClass,
+                getString(tag).toUpperCase(Locale.ROOT));
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RexNode> getExpressionList(String tag) {
+            List<Object> jsonNodes = (List)jsonRel.get(tag);
+            List<RexNode> nodes = new ArrayList<>();
+            for (Object jsonNode : jsonNodes)
+                nodes.add(relJson.toRex(this, jsonNode));
+            return nodes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType getRowType(String tag) {
+            Object o = jsonRel.get(tag);
+            return relJson.toType(Commons.typeFactory(cluster), o);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType getRowType(String expressionsTag, String fieldsTag) {
+            List<RexNode> expressionList = getExpressionList(expressionsTag);
+            List<String> names =
+                (List<String>)get(fieldsTag);
+            return Commons.typeFactory(cluster).createStructType(
+                new AbstractList<Map.Entry<String, RelDataType>>() {
+                    @Override public Map.Entry<String, RelDataType> get(int index) {
+                        return Pair.of(names.get(index),
+                            expressionList.get(index).getType());
+                    }
+
+                    @Override public int size() {
+                        return names.size();
+                    }
+                });
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelCollation getCollation() {
+            return relJson.toCollation((List)get("collation"));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelCollation getCollation(String tag) {
+            return relJson.toCollation((List)get(tag));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDistribution getDistribution() {
+            return relJson.toDistribution(get("distribution"));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ImmutableList<ImmutableList<RexLiteral>> getTuples(String tag) {
+            List<List> jsonTuples = (List)get(tag);
+            ImmutableList.Builder<ImmutableList<RexLiteral>> builder =
+                ImmutableList.builder();
+            for (List jsonTuple : jsonTuples)
+                builder.add(getTuple(jsonTuple));
+            return builder.build();
+        }
+
+        /** */
+        private RelNode lookupInput(String jsonInput) {
+            RelNode node = relMap.get(jsonInput);
+            if (node == null)
+                throw new RuntimeException("unknown id " + jsonInput
+                    + " for relational expression");
+            return node;
+        }
+
+        /** */
+        private ImmutableList<RexLiteral> getTuple(List jsonTuple) {
+            ImmutableList.Builder<RexLiteral> builder =
+                ImmutableList.builder();
+            for (Object jsonValue : jsonTuple)
+                builder.add((RexLiteral)relJson.toRex(this, jsonValue));
+            return builder.build();
+        }
+
+        /** */
+        private AggregateCall toAggCall(Map<String, Object> jsonAggCall) {
+            Map<String, Object> aggMap = (Map)jsonAggCall.get("agg");
+            SqlAggFunction aggregation = (SqlAggFunction)relJson.toOp(aggMap);
+            Boolean distinct = (Boolean)jsonAggCall.get("distinct");
+            List<Integer> operands = (List<Integer>)jsonAggCall.get("operands");
+            Integer filterOperand = (Integer)jsonAggCall.get("filter");
+            RelDataType type = relJson.toType(Commons.typeFactory(cluster), jsonAggCall.get("type"));
+            String name = (String)jsonAggCall.get("name");
+            return AggregateCall.create(aggregation, distinct, false, false, operands,
+                filterOperand == null ? -1 : filterOperand,
+                RelCollations.EMPTY,
+                type, name);
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
new file mode 100644
index 0000000..b96ec8d
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.externalize;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Callback for a relational expression to dump itself as JSON.
+ *
+ * @see RelJsonReader
+ */
+public class RelJsonWriter implements RelWriter {
+    /** */
+    private static final boolean PRETTY_PRINT = false;
+        // TODO: IgniteSystemProperties.getBoolean("IGNITE_CALCITE_REL_JSON_PRETTY_PRINT", false);
+
+    /** */
+    private final RelJson relJson;
+
+    /** */
+    private final List<Object> relList = new ArrayList<>();
+
+    /** */
+    private final Map<RelNode, String> relIdMap = new IdentityHashMap<>();
+
+    /** */
+    private final boolean pretty;
+
+    /** */
+    private String previousId;
+
+    /** */
+    private List<Pair<String, Object>> items = new ArrayList<>();
+
+    /** */
+    public static String toJson(RelNode rel) {
+        RelJsonWriter writer = new RelJsonWriter(rel.getCluster(), PRETTY_PRINT);
+        rel.explain(writer);
+
+        return writer.asString();
+    }
+
+    /** */
+    public RelJsonWriter(RelOptCluster cluster, boolean pretty) {
+        this.pretty = pretty;
+
+        relJson = new RelJson(cluster);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void explain(RelNode rel, List<Pair<String, Object>> valueList) {
+        explain_(rel, valueList);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlExplainLevel getDetailLevel() {
+        return SqlExplainLevel.ALL_ATTRIBUTES;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter item(String term, Object value) {
+        items.add(Pair.of(term, value));
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter done(RelNode node) {
+        List<Pair<String, Object>> current0 = items;
+        items = new ArrayList<>();
+        explain_(node, current0);
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean nest() {
+        return true;
+    }
+
+    /** */
+    public String asString() {
+        try {
+            StringWriter writer = new StringWriter();
+            ObjectMapper mapper = new ObjectMapper();
+
+            ObjectWriter writer0 = pretty
+                ? mapper.writer(new DefaultPrettyPrinter())
+                : mapper.writer();
+
+            writer0
+                .withRootName("rels")
+                .writeValue(writer, relList);
+
+            return writer.toString();
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void explain_(RelNode rel, List<Pair<String, Object>> values) {
+        final Map<String, Object> map = relJson.map();
+
+        map.put("id", null); // ensure that id is the first attribute
+        map.put("relOp", relJson.classToTypeName(rel.getClass()));
+
+        for (Pair<String, Object> value : values) {
+            if (value.right instanceof RelNode)
+                continue;
+
+            map.put(value.left, relJson.toJson(value.right));
+        }
+        // omit 'inputs: ["3"]' if "3" is the preceding rel
+        final List<Object> list = explainInputs(rel.getInputs());
+        if (list.size() != 1 || !list.get(0).equals(previousId))
+            map.put("inputs", list);
+
+        final String id = Integer.toString(relIdMap.size());
+        relIdMap.put(rel, id);
+        map.put("id", id);
+
+        relList.add(map);
+        previousId = id;
+    }
+
+    /** */
+    private List<Object> explainInputs(List<RelNode> inputs) {
+        final List<Object> list = relJson.list();
+        for (RelNode input : inputs) {
+            String id = relIdMap.get(input);
+            if (id == null) {
+                input.explain(this);
+                id = previousId;
+            }
+            list.add(id);
+        }
+        return list;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
new file mode 100644
index 0000000..fa6b1f8
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.IgniteIntList;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
+
+/** */
+public class ColocationGroup {
+    /** */
+    private static final int SYNTHETIC_PARTITIONS_COUNT = 512;
+        // TODO: IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT", 512);
+
+    /** */
+    private List<Long> sourceIds;
+
+    /** */
+    private List<UUID> nodeIds;
+
+    /** */
+    private List<List<UUID>> assignments;
+
+    /** */
+    public static ColocationGroup forNodes(List<UUID> nodeIds) {
+        return new ColocationGroup(null, nodeIds, null);
+    }
+
+    /** */
+    public static ColocationGroup forAssignments(List<List<UUID>> assignments) {
+        return new ColocationGroup(null, null, assignments);
+    }
+
+    /** */
+    public static ColocationGroup forSourceId(long sourceId) {
+        return new ColocationGroup(Collections.singletonList(sourceId), null, null);
+    }
+
+    /** */
+    private ColocationGroup(List<Long> sourceIds, List<UUID> nodeIds, List<List<UUID>> assignments) {
+        this.sourceIds = sourceIds;
+        this.nodeIds = nodeIds;
+        this.assignments = assignments;
+    }
+
+    /**
+     * @return Lists of colocation group sources.
+     */
+    public List<Long> sourceIds() {
+        return sourceIds == null ? Collections.emptyList() : sourceIds;
+    }
+
+    /**
+     * @return Lists of nodes capable to execute a query fragment for what the mapping is calculated.
+     */
+    public List<UUID> nodeIds() {
+        return nodeIds == null ? Collections.emptyList() : nodeIds;
+    }
+
+    /**
+     * @return List of partitions (index) and nodes (items) having an appropriate partition in
+     * OWNING state, calculated for distributed tables, involved in query execution.
+     */
+    public List<List<UUID>> assignments() {
+        return assignments == null ? Collections.emptyList() : assignments;
+    }
+
+    /**
+     * Prunes involved partitions (hence nodes, involved in query execution) on the basis of filter,
+     * its distribution, query parameters and original nodes mapping.
+     * @param rel Filter.
+     * @return Resulting nodes mapping.
+     */
+    public ColocationGroup prune(IgniteRel rel) {
+        return this; // TODO https://issues.apache.org/jira/browse/IGNITE-12455
+    }
+
+    /** */
+    public boolean belongs(long sourceId) {
+        return sourceIds != null && sourceIds.contains(sourceId);
+    }
+
+    /**
+     * Merges this mapping with given one.
+     * @param other Mapping to merge with.
+     * @return Merged nodes mapping.
+     * @throws ColocationMappingException If involved nodes intersection is empty, hence there is no nodes capable to execute
+     * being calculated fragment.
+     */
+    public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException {
+        List<Long> sourceIds;
+        if (this.sourceIds == null || other.sourceIds == null)
+            sourceIds = firstNotNull(this.sourceIds, other.sourceIds);
+        else
+            sourceIds = Commons.combine(this.sourceIds, other.sourceIds);
+
+        List<UUID> nodeIds;
+        if (this.nodeIds == null || other.nodeIds == null)
+            nodeIds = firstNotNull(this.nodeIds, other.nodeIds);
+        else
+            nodeIds = Commons.intersect(other.nodeIds, this.nodeIds);
+
+        if (nodeIds != null && nodeIds.isEmpty()) {
+            throw new ColocationMappingException("Failed to map fragment to location. " +
+                "Replicated query parts are not co-located on all nodes");
+        }
+
+        List<List<UUID>> assignments;
+        if (this.assignments == null || other.assignments == null) {
+            assignments = firstNotNull(this.assignments, other.assignments);
+
+            if (assignments != null && nodeIds != null) {
+                Set<UUID> filter = new HashSet<>(nodeIds);
+                List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
+
+                for (int i = 0; i < assignments.size(); i++) {
+                    List<UUID> assignment = Commons.intersect(filter, assignments.get(i));
+
+                    if (assignment.isEmpty()) { // TODO check with partition filters
+                        throw new ColocationMappingException("Failed to map fragment to location. " +
+                            "Partition mapping is empty [part=" + i + "]");
+                    }
+
+                    assignments0.add(assignment);
+                }
+
+                assignments = assignments0;
+            }
+        }
+        else {
+            assert this.assignments.size() == other.assignments.size();
+            assignments = new ArrayList<>(this.assignments.size());
+            Set<UUID> filter = nodeIds == null ? null : new HashSet<>(nodeIds);
+            for (int i = 0; i < this.assignments.size(); i++) {
+                List<UUID> assignment = Commons.intersect(this.assignments.get(i), other.assignments.get(i));
+
+                if (filter != null)
+                    assignment.retainAll(filter);
+
+                if (assignment.isEmpty()) // TODO check with partition filters
+                    throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
+
+                assignments.add(assignment);
+            }
+        }
+
+        return new ColocationGroup(sourceIds, nodeIds, assignments);
+    }
+
+    /** */
+    public ColocationGroup finalaze() {
+        if (assignments == null && nodeIds == null)
+            return this;
+
+        if (assignments != null) {
+            List<List<UUID>> assignments = new ArrayList<>(this.assignments.size());
+            Set<UUID> nodes = new HashSet<>();
+            for (List<UUID> assignment : this.assignments) {
+                UUID first = first(assignment);
+                if (first != null)
+                    nodes.add(first);
+                assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
+            }
+
+            return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments);
+        }
+
+        return forNodes0(nodeIds);
+    }
+
+    /** */
+    public ColocationGroup mapToNodes(List<UUID> nodeIds) {
+        return !nullOrEmpty(this.nodeIds) ? this : forNodes0(nodeIds);
+    }
+
+    /** */
+    @NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
+        List<List<UUID>> assignments = new ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
+        for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++)
+            assignments.add(asList(nodeIds.get(i % nodeIds.size())));
+        return new ColocationGroup(sourceIds, nodeIds, assignments);
+    }
+
+    /**
+     * Returns List of partitions to scan on the given node.
+     *
+     * @param nodeId Cluster node ID.
+     * @return List of partitions to scan on the given node.
+     */
+    public int[] partitions(UUID nodeId) {
+        IgniteIntList parts = new IgniteIntList(assignments.size());
+
+        for (int i = 0; i < assignments.size(); i++) {
+            List<UUID> assignment = assignments.get(i);
+            if (Objects.equals(nodeId, first(assignment)))
+                parts.add(i);
+        }
+
+        return parts.array();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationMappingException.java
similarity index 65%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationMappingException.java
index 01746a7..a1be2b7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationMappingException.java
@@ -14,22 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
-    /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
-    }
-
-    /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
-    }
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-    /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
+/**
+ *
+ */
+public class ColocationMappingException extends Exception {
+    /**
+     * @param message Message.
+     */
+    public ColocationMappingException(String message) {
+        super(message);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
new file mode 100644
index 0000000..5d8f2ad
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
+
+/**
+ *
+ */
+public class FragmentMapping {
+    /** */
+    private List<ColocationGroup> colocationGroups;
+
+    /** */
+    public FragmentMapping() {
+    }
+
+    /** */
+    private FragmentMapping(ColocationGroup colocationGroup) {
+        this(asList(colocationGroup));
+    }
+
+    /** */
+    private FragmentMapping(List<ColocationGroup> colocationGroups) {
+        this.colocationGroups = colocationGroups;
+    }
+
+    /** */
+    public static FragmentMapping create() {
+        return new FragmentMapping(Collections.emptyList());
+    }
+
+    /** */
+    public static FragmentMapping create(UUID nodeId) {
+        return new FragmentMapping(ColocationGroup.forNodes(Collections.singletonList(nodeId)));
+    }
+
+    /** */
+    public static FragmentMapping create(long sourceId) {
+        return new FragmentMapping(ColocationGroup.forSourceId(sourceId));
+    }
+
+    /** */
+    public static FragmentMapping create(long sourceId, ColocationGroup group) {
+        try {
+            return new FragmentMapping(ColocationGroup.forSourceId(sourceId).colocate(group));
+        }
+        catch (ColocationMappingException e) {
+            throw new AssertionError(e); // Cannot happen
+        }
+    }
+
+    /** */
+    public boolean colocated() {
+        return colocationGroups.isEmpty() || colocationGroups.size() == 1;
+    }
+
+    /** */
+    public FragmentMapping prune(IgniteRel rel) {
+        if (colocationGroups.size() != 1)
+            return this;
+
+        return new FragmentMapping(first(colocationGroups).prune(rel));
+    }
+
+    /** */
+    public FragmentMapping combine(FragmentMapping other) {
+        return new FragmentMapping(Commons.combine(colocationGroups, other.colocationGroups));
+    }
+
+    /** */
+    public FragmentMapping colocate(FragmentMapping other) throws ColocationMappingException {
+        assert colocated() && other.colocated();
+
+        ColocationGroup first = first(colocationGroups);
+        ColocationGroup second = first(other.colocationGroups);
+
+        if (first == null && second == null)
+            return this;
+        else if (first == null || second == null)
+            return new FragmentMapping(firstNotNull(first, second));
+        else
+            return new FragmentMapping(first.colocate(second));
+    }
+
+    /** */
+    public List<UUID> nodeIds() {
+        return colocationGroups.stream()
+            .flatMap(g -> g.nodeIds().stream())
+            .distinct().collect(Collectors.toList());
+    }
+
+    /** */
+    public FragmentMapping finalize(Supplier<List<UUID>> nodesSource) {
+        if (colocationGroups.isEmpty())
+            return this;
+
+        List<ColocationGroup> colocationGroups = this.colocationGroups;
+
+        colocationGroups = Commons.transform(colocationGroups, ColocationGroup::finalaze);
+        List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ? nodesSource.get() : nodes;
+        colocationGroups = Commons.transform(colocationGroups, g -> g.mapToNodes(nodes0));
+
+        return new FragmentMapping(colocationGroups);
+    }
+
+    /** */
+    public @NotNull ColocationGroup findGroup(long sourceId) {
+        List<ColocationGroup> groups = colocationGroups.stream()
+            .filter(c -> c.belongs(sourceId))
+            .collect(Collectors.toList());
+
+        if (groups.isEmpty())
+            throw new IllegalStateException("Failed to find group with given id. [sourceId=" + sourceId + "]");
+        else if (groups.size() > 1)
+            throw new IllegalStateException("Multiple groups with the same id found. [sourceId=" + sourceId + "]");
+
+        return first(groups);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMappingException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMappingException.java
new file mode 100644
index 0000000..19a0d75
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMappingException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+
+/**
+ *
+ */
+public class FragmentMappingException extends RuntimeException {
+    /** */
+    private final Fragment fragment;
+
+    /** */
+    private final RelNode node;
+
+    /**
+     *
+     * @param message Message.
+     * @param node Node of a query plan, where the exception was thrown.
+     * @param cause Cause.
+     */
+    public FragmentMappingException(String message, Fragment fragment, RelNode node, Throwable cause) {
+        super(message, cause);
+        this.fragment = fragment;
+        this.node = node;
+    }
+
+    /**
+     * @return Fragment of a query plan, where the exception was thrown.
+     */
+    public Fragment fragment() {
+        return fragment;
+    }
+
+    /**
+     * @return Node of a query plan, where the exception was thrown.
+     */
+    public RelNode node() {
+        return node;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
new file mode 100644
index 0000000..e16ce58
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMappingMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+
+/**
+ * Implementation class for {@link RelMetadataQueryEx#fragmentMapping(RelNode)} method call.
+ */
+public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingMetadata> {
+    /**
+     * Metadata provider, responsible for nodes mapping request. It uses this implementation class under the hood.
+     */
+    public static final RelMetadataProvider SOURCE =
+        ReflectiveRelMetadataProvider.reflectiveSource(
+            IgniteMethod.FRAGMENT_MAPPING.method(), new IgniteMdFragmentMapping());
+
+    /** {@inheritDoc} */
+    @Override public MetadataDef<FragmentMappingMetadata> getDef() {
+        return FragmentMappingMetadata.DEF;
+    }
+
+    /**
+     * Requests meta information about nodes capable to execute a query over particular partitions.
+     *
+     * @param rel Relational node.
+     * @param mq Metadata query instance. Used to request appropriate metadata from node children.
+     * @return Nodes mapping, representing a list of nodes capable to execute a query over particular partitions.
+     */
+    public FragmentMapping fragmentMapping(RelNode rel, RelMetadataQuery mq) {
+        throw new AssertionError();
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(RelSubset rel, RelMetadataQuery mq) {
+        throw new AssertionError();
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(SingleRel rel, RelMetadataQuery mq) {
+        return _fragmentMapping(rel.getInput(), mq);
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     *
+     * {@link ColocationMappingException} may be thrown on two children nodes locations merge. This means
+     * that the fragment (which part the parent node is) cannot be executed on any node and additional exchange
+     * is needed. This case we throw {@link NodeMappingException} with an edge, where we need the additional
+     * exchange. After the exchange is put into the fragment and the fragment is split into two ones, fragment meta
+     * information will be recalculated for all fragments.
+     */
+    public FragmentMapping fragmentMapping(BiRel rel, RelMetadataQuery mq) {
+        RelNode left = rel.getLeft();
+        RelNode right = rel.getRight();
+
+        FragmentMapping fLeft = _fragmentMapping(left, mq);
+        FragmentMapping fRight = _fragmentMapping(right, mq);
+
+        try {
+            return fLeft.colocate(fRight);
+        }
+        catch (ColocationMappingException e) {
+            IgniteExchange lExch = new IgniteExchange(rel.getCluster(), left.getTraitSet(), left, TraitUtils.distribution(left));
+            IgniteExchange rExch = new IgniteExchange(rel.getCluster(), right.getTraitSet(), right, TraitUtils.distribution(right));
+
+            RelNode lVar = rel.copy(rel.getTraitSet(), ImmutableList.of(lExch, right));
+            RelNode rVar = rel.copy(rel.getTraitSet(), ImmutableList.of(left, rExch));
+
+            RelOptCost lVarCost = mq.getCumulativeCost(lVar);
+            RelOptCost rVarCost = mq.getCumulativeCost(rVar);
+
+            if (lVarCost.isLt(rVarCost))
+                throw new NodeMappingException("Failed to calculate physical distribution", left, e);
+            else
+                throw new NodeMappingException("Failed to calculate physical distribution", right, e);
+        }
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     *
+     * {@link ColocationMappingException} may be thrown on two children nodes locations merge. This means
+     * that the fragment (which part the parent node is) cannot be executed on any node and additional exchange
+     * is needed. This case we throw {@link NodeMappingException} with an edge, where we need the additional
+     * exchange. After the exchange is put into the fragment and the fragment is split into two ones, fragment meta
+     * information will be recalculated for all fragments.
+     */
+    public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq) {
+        FragmentMapping res = null;
+
+        if (TraitUtils.distribution(rel) == IgniteDistributions.random()) {
+            for (RelNode input : rel.getInputs())
+                res = res == null ? _fragmentMapping(input, mq) : res.combine(_fragmentMapping(input, mq));
+        }
+        else {
+            for (RelNode input : rel.getInputs()) {
+                try {
+                    res = res == null ? _fragmentMapping(input, mq) : res.colocate(_fragmentMapping(input, mq));
+                }
+                catch (ColocationMappingException e) {
+                    throw new NodeMappingException("Failed to calculate physical distribution", input, e);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     *
+     * Prunes involved partitions (hence nodes, involved in query execution) if possible.
+     */
+    public FragmentMapping fragmentMapping(IgniteFilter rel, RelMetadataQuery mq) {
+        return _fragmentMapping(rel.getInput(), mq).prune(rel);
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     *
+     * Prunes involved partitions (hence nodes, involved in query execution) if possible.
+     */
+    public FragmentMapping fragmentMapping(IgniteTrimExchange rel, RelMetadataQuery mq) {
+        try {
+            return FragmentMapping.create(rel.sourceId())
+                .colocate(_fragmentMapping(rel.getInput(), mq));
+        }
+        catch (ColocationMappingException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(IgniteReceiver rel, RelMetadataQuery mq) {
+        return FragmentMapping.create(rel.exchangeId());
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(IgniteIndexScan rel, RelMetadataQuery mq) {
+        return FragmentMapping.create(rel.sourceId(),
+            rel.getTable().unwrap(IgniteTable.class).colocationGroup(Commons.context(rel)));
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(IgniteTableScan rel, RelMetadataQuery mq) {
+        return FragmentMapping.create(rel.sourceId(),
+            rel.getTable().unwrap(IgniteTable.class).colocationGroup(Commons.context(rel)));
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(IgniteValues rel, RelMetadataQuery mq) {
+        return FragmentMapping.create();
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+     */
+    public FragmentMapping fragmentMapping(IgniteTableFunctionScan rel, RelMetadataQuery mq) {
+        return FragmentMapping.create();
+    }
+
+    /**
+     * Fragment info calculation entry point.
+     * @param rel Root node of a calculated fragment.
+     * @param mq Metadata query instance.
+     * @return Fragment meta information.
+     */
+    public static FragmentMapping _fragmentMapping(RelNode rel, RelMetadataQuery mq) {
+        assert mq instanceof RelMetadataQueryEx;
+
+        return ((RelMetadataQueryEx) mq).fragmentMapping(rel);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 9ad0a11..1b71fc1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -18,17 +18,27 @@
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
 import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
+import org.apache.calcite.rel.metadata.Metadata;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
 /**
  * Utility class, holding metadata related interfaces and metadata providers.
  */
 public class IgniteMetadata {
+    /** */
     public static final RelMetadataProvider METADATA_PROVIDER =
         ChainedRelMetadataProvider.of(
             ImmutableList.of(
+                // Ignite specific providers
+                IgniteMdFragmentMapping.SOURCE,
+
                 // Ignite overriden providers
                 IgniteMdDistribution.SOURCE,
                 IgniteMdPercentageOriginalRows.SOURCE,
@@ -42,4 +52,18 @@ public class IgniteMetadata {
 
                 // Basic providers
                 DefaultRelMetadataProvider.INSTANCE));
+
+    /** */
+    public interface FragmentMappingMetadata extends Metadata {
+        MetadataDef<FragmentMappingMetadata> DEF = MetadataDef.of(FragmentMappingMetadata.class,
+            FragmentMappingMetadata.Handler.class, IgniteMethod.FRAGMENT_MAPPING.method());
+
+        /** Determines how the rows are distributed. */
+        FragmentMapping fragmentMapping();
+
+        /** Handler API. */
+        interface Handler extends MetadataHandler<FragmentMappingMetadata> {
+            FragmentMapping fragmentMapping(RelNode r, RelMetadataQuery mq);
+        }
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
new file mode 100644
index 0000000..e59dbfe
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service is responsible for nodes mapping calculation.
+ */
+public interface MappingService {
+    /**
+     * Returns Nodes responsible for executing intermediate fragments (fragments without Scan leafs). Such fragments may be executed
+     * on any cluster node, actual list of nodes is chosen on the basis of adopted selection strategy (using node filter).
+     *
+     * @param topVer Topology version.
+     * @param single Flag, indicating that a fragment should execute in a single node.
+     * @param nodeFilter Node filter.
+     * @return Nodes mapping for intermediate fragments.
+     */
+    List<UUID> executionNodes(long topVer, boolean single, @Nullable Predicate<ClusterNode> nodeFilter);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodeMappingException.java
similarity index 56%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodeMappingException.java
index 01746a7..6640e30 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodeMappingException.java
@@ -14,22 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
-    /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
-    }
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
+import org.apache.calcite.rel.RelNode;
+
+/**
+ *
+ */
+public class NodeMappingException extends RuntimeException {
     /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
+    private final RelNode node;
+
+    /**
+     *
+     * @param message Message.
+     * @param node Node of a query plan, where the exception was thrown.
+     * @param cause Cause.
+     */
+    public NodeMappingException(String message, RelNode node, Throwable cause) {
+        super(message, cause);
+        this.node = node;
     }
 
-    /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
+    /**
+     * @return Node of a query plan, where the exception was thrown.
+     */
+    public RelNode node() {
+        return node;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index d405804..1758dd3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -51,6 +51,13 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
         JaninoRelMetadataProvider.DEFAULT.register(types);
     }
 
+    /** */
+    private static final IgniteMetadata.FragmentMappingMetadata.Handler SOURCE_DISTRIBUTION_INITIAL_HANDLER =
+        initialHandler(IgniteMetadata.FragmentMappingMetadata.Handler.class);
+
+    /** */
+    private IgniteMetadata.FragmentMappingMetadata.Handler sourceDistributionHandler;
+
     /**
      * Factory method.
      *
@@ -74,4 +81,25 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
             THREAD_PROVIDERS.remove();
         }
     }
+
+    /** */
+    private RelMetadataQueryEx() {
+        sourceDistributionHandler = SOURCE_DISTRIBUTION_INITIAL_HANDLER;
+    }
+
+    /**
+     * Calculates data location mapping for a query fragment the given relation node is a root of.
+     *
+     * @param rel Relational node.
+     * @return Fragment meta information.
+     */
+    public FragmentMapping fragmentMapping(RelNode rel) {
+        for (;;) {
+            try {
+                return sourceDistributionHandler.fragmentMapping(rel, this);
+            } catch (JaninoRelMetadataProvider.NoHandler e) {
+                sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentMappingMetadata.DEF);
+            }
+        }
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
new file mode 100644
index 0000000..2bfba75
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
+
+/**
+ *
+ */
+public abstract class AbstractMultiStepPlan implements MultiStepPlan {
+    /** */
+    protected final Object fieldsMetadata;
+
+    /** */
+    protected final QueryTemplate queryTemplate;
+
+    /** */
+    protected ExecutionPlan executionPlan;
+
+    /** */
+    protected AbstractMultiStepPlan(QueryTemplate queryTemplate, Object fieldsMetadata) {
+        this.queryTemplate = queryTemplate;
+        this.fieldsMetadata = fieldsMetadata;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Fragment> fragments() {
+        return Objects.requireNonNull(executionPlan).fragments();
+    }
+
+    /** {@inheritDoc} */
+    @Override public FragmentMapping mapping(Fragment fragment) {
+        return mapping(fragment.fragmentId());
+    }
+
+    /** {@inheritDoc} */
+    @Override public ColocationGroup target(Fragment fragment) {
+        if (fragment.rootFragment())
+            return null;
+
+        IgniteSender sender = (IgniteSender)fragment.root();
+        return mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Long, List<UUID>> remotes(Fragment fragment) {
+        List<IgniteReceiver> remotes = fragment.remotes();
+
+        if (nullOrEmpty(remotes))
+            return null;
+
+        HashMap<Long, List<UUID>> res = newHashMap(remotes.size());
+
+        for (IgniteReceiver remote : remotes)
+            res.put(remote.exchangeId(), mapping(remote.sourceFragmentId()).nodeIds());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void init(PlanningContext ctx) {
+        executionPlan = queryTemplate.map(ctx);
+    }
+
+    /** */
+    private FragmentMapping mapping(long fragmentId) {
+        return Objects.requireNonNull(executionPlan).fragments().stream()
+            .filter(f -> f.fragmentId() == fragmentId)
+            .findAny().orElseThrow(() -> new IllegalStateException("Cannot find fragment with given ID. [" +
+                "fragmentId=" + fragmentId + ", " +
+                "fragments=" + fragments() + "]"))
+            .mapping();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
new file mode 100644
index 0000000..eda9fd9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+
+/** */
+public class Cloner implements IgniteRelVisitor<IgniteRel> {
+    /** */
+    private final RelOptCluster cluster;
+
+    /** */
+    private ImmutableList.Builder<IgniteReceiver> remotes;
+
+    /** */
+    Cloner(RelOptCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    /**
+     * Clones and associates a plan with a new cluster.
+     *
+     * @param src Fragment to clone.
+     * @return New plan.
+     */
+    public Fragment go(Fragment src) {
+        try {
+            remotes = ImmutableList.builder();
+
+            IgniteRel newRoot = visit(src.root());
+            ImmutableList<IgniteReceiver> remotes = this.remotes.build();
+
+            return new Fragment(src.fragmentId(), newRoot, remotes, src.serialized(), src.mapping());
+        }
+        finally {
+            remotes = null;
+        }
+    }
+
+    /** */
+    public static IgniteRel clone(IgniteRel r) {
+        Cloner c = new Cloner(r.getCluster());
+
+        return c.visit(r);
+    }
+
+    /** */
+    private IgniteReceiver collect(IgniteReceiver receiver) {
+        if (remotes != null)
+            remotes.add(receiver);
+
+        return receiver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSender rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteFilter rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTrimExchange rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteProject rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableModify rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
+            visit((IgniteRel) rel.getRight())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
+            visit((IgniteRel) rel.getRight())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMergeJoin rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
+            visit((IgniteRel) rel.getRight())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexScan rel) {
+        return rel.clone(cluster, asList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        return rel.clone(cluster, asList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteValues rel) {
+        return rel.clone(cluster, asList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteUnionAll rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSort rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableSpool rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSortedIndexSpool rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteLimit rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReceiver rel) {
+        return collect((IgniteReceiver)rel.clone(cluster, asList()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteExchange rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSingleHashAggregate rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapHashAggregate rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReduceHashAggregate rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSingleSortAggregate rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapSortAggregate rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteHashIndexSpool rel) {
+        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSingleMinus rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteMapMinus rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReduceMinus rel) {
+        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableFunctionScan rel) {
+        return rel.clone(cluster, asList());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteRel rel) {
+        return rel.accept(this);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
similarity index 61%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
index 01746a7..53dc4fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
@@ -14,22 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+class ExecutionPlan {
+    /** */
+    private final long ver;
+
+    /** */
+    private final ImmutableList<Fragment> fragments;
+
     /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
+    ExecutionPlan(long ver, List<Fragment> fragments) {
+        this.ver = ver;
+        this.fragments = ImmutableList.copyOf(fragments);
     }
 
     /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
+    public long topologyVersion() {
+        return ver;
     }
 
     /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
+    public List<Fragment> fragments() {
+        return fragments;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
new file mode 100644
index 0000000..aaba73c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodeMappingException;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
+
+/**
+ * Fragment of distributed query
+ */
+public class Fragment {
+    /** */
+    private final long id;
+
+    /** */
+    private final IgniteRel root;
+
+    /** Serialized root representation. */
+    @IgniteToStringExclude
+    private final String rootSer;
+
+    /** */
+    private final FragmentMapping mapping;
+
+    /** */
+    private final ImmutableList<IgniteReceiver> remotes;
+
+    /**
+     * @param id Fragment id.
+     * @param root Root node of the fragment.
+     * @param remotes Remote sources of the fragment.
+     */
+    public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes) {
+        this(id, root, remotes, null, null);
+    }
+
+    /** */
+    Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) {
+        this.id = id;
+        this.root = root;
+        this.remotes = ImmutableList.copyOf(remotes);
+        this.rootSer = rootSer != null ? rootSer : toJson(root);
+        this.mapping = mapping;
+    }
+
+    /**
+     * @return Fragment ID.
+     */
+    public long fragmentId() {
+        return id;
+    }
+
+    /**
+     * @return Root node.
+     */
+    public IgniteRel root() {
+        return root;
+    }
+
+    /**
+     * Lazy serialized root representation.
+     *
+     * @return Serialized form.
+     */
+    public String serialized() {
+        return rootSer;
+    }
+
+    /** */
+    public FragmentMapping mapping() {
+        return mapping;
+    }
+
+    /**
+     * @return Fragment remote sources.
+     */
+    public List<IgniteReceiver> remotes() {
+        return remotes;
+    }
+
+    /** */
+    public boolean rootFragment() {
+        return !(root instanceof IgniteSender);
+    }
+
+    /** */
+    public Fragment attach(PlanningContext ctx) {
+        RelOptCluster cluster = ctx.cluster();
+
+        return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
+    }
+
+    /** */
+    public Fragment detach() {
+        RelOptCluster cluster = PlanningContext.empty().cluster();
+
+        return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
+    }
+
+    /**
+     * Mapps the fragment to its data location.
+     * @param ctx Planner context.
+     * @param mq Metadata query.
+     */
+    Fragment map(MappingService mappingSrvc, PlanningContext ctx, RelMetadataQuery mq) throws FragmentMappingException {
+        assert root.getCluster() == ctx.cluster() : "Fragment is detached [fragment=" + this + "]";
+
+        if (mapping != null)
+            return this;
+
+        return new Fragment(id, root, remotes, rootSer, mapping(ctx, mq, nodesSource(mappingSrvc, ctx)));
+    }
+
+    /** */
+    private FragmentMapping mapping(PlanningContext ctx, RelMetadataQuery mq, Supplier<List<UUID>> nodesSource) {
+        try {
+            FragmentMapping mapping = IgniteMdFragmentMapping._fragmentMapping(root, mq);
+
+            if (rootFragment())
+                mapping = FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+
+            if (single() && mapping.nodeIds().size() > 1) {
+                // this is possible when the fragment contains scan of a replicated cache, which brings
+                // several nodes (actually all containing nodes) to the colocation group, but this fragment
+                // supposed to be executed on a single node, so let's choose one wisely
+                mapping = FragmentMapping.create(mapping.nodeIds()
+                    .get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
+            }
+
+            return mapping.finalize(nodesSource);
+        }
+        catch (NodeMappingException e) {
+            throw new FragmentMappingException("Failed to calculate physical distribution", this, e.node(), e);
+        }
+        catch (ColocationMappingException e) {
+            throw new FragmentMappingException("Failed to calculate physical distribution", this, root, e);
+        }
+    }
+
+    /** */
+    @NotNull private Supplier<List<UUID>> nodesSource(MappingService mappingSrvc, PlanningContext ctx) {
+        return () -> mappingSrvc.executionNodes(ctx.topologyVersion(), single(), null);
+    }
+
+    /** */
+    private boolean single() {
+        return root instanceof IgniteSender
+            && ((IgniteSender)root).sourceDistribution().satisfies(IgniteDistributions.single());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(Fragment.class, this, "root", RelOptUtil.toString(root));
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
new file mode 100644
index 0000000..a9d31ca
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ *
+ */
+public class FragmentSplitter extends IgniteRelShuttle {
+    /** */
+    private final Deque<FragmentProto> stack = new LinkedList<>();
+
+    /** */
+    private RelNode cutPoint;
+
+    /** */
+    private FragmentProto curr;
+
+    /** */
+    public FragmentSplitter(RelNode cutPoint) {
+        this.cutPoint = cutPoint;
+    }
+
+    /** */
+    public List<Fragment> go(Fragment fragment) {
+        ArrayList<Fragment> res = new ArrayList<>();
+
+        stack.push(new FragmentProto(IdGenerator.nextId(), fragment.root()));
+
+        while (!stack.isEmpty()) {
+            curr = stack.pop();
+            curr.root = visit(curr.root);
+            res.add(curr.build());
+            curr = null;
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReceiver rel) {
+        curr.remotes.add(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteExchange rel) {
+        throw new AssertionError();
+    }
+
+    /**
+     * Visits all children of a parent.
+     */
+    @Override protected IgniteRel processNode(IgniteRel rel) {
+        if (rel == cutPoint) {
+            cutPoint = null;
+
+            return split(rel);
+        }
+
+        List<IgniteRel> inputs = Commons.cast(rel.getInputs());
+
+        for (int i = 0; i < inputs.size(); i++)
+            visitChild(rel, i, inputs.get(i));
+
+        return rel;
+    }
+
+    /** */
+    private IgniteRel split(IgniteRel rel) {
+        RelOptCluster cluster = rel.getCluster();
+        RelTraitSet traits = rel.getTraitSet();
+        RelDataType rowType = rel.getRowType();
+
+        RelNode input = rel instanceof IgniteTrimExchange ? rel.getInput(0) : rel;
+
+        long targetFragmentId = curr.id;
+        long sourceFragmentId = IdGenerator.nextId();
+        long exchangeId = sourceFragmentId;
+
+        IgniteReceiver receiver = new IgniteReceiver(cluster, traits, rowType, exchangeId, sourceFragmentId);
+        IgniteSender sender = new IgniteSender(cluster, traits, input, exchangeId, targetFragmentId, rel.distribution());
+
+        curr.remotes.add(receiver);
+        stack.push(new FragmentProto(sourceFragmentId, sender));
+
+        return receiver;
+    }
+
+    /** */
+    private static class FragmentProto {
+        /** */
+        private final long id;
+
+        /** */
+        private IgniteRel root;
+
+        /** */
+        private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
+
+        /** */
+        private FragmentProto(long id, IgniteRel root) {
+            this.id = id;
+            this.root = root;
+        }
+
+        /** */
+        Fragment build() {
+            return new Fragment(id, root, remotes.build());
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
similarity index 68%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
index 01746a7..bc57ff8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
@@ -14,22 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** */
+public class IdGenerator {
     /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
-    }
+    private static final AtomicLong ID_GEN = new AtomicLong();
 
     /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
-    }
+    private IdGenerator() {}
 
     /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
+    public static long nextId() {
+        return ID_GEN.getAndIncrement();
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
new file mode 100644
index 0000000..6337018
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+
+/**
+ * Regular query or DML
+ */
+public interface MultiStepPlan extends QueryPlan {
+    /**
+     * @return Query fragments.
+     */
+    List<Fragment> fragments();
+
+    /**
+     * @param fragment Fragment.
+     * @return Mapping for a given fragment.
+     */
+    FragmentMapping mapping(Fragment fragment);
+
+    /** */
+    ColocationGroup target(Fragment fragment);
+
+    /** */
+    Map<Long, List<UUID>> remotes(Fragment fragment);
+
+    /**
+     * Inits query fragments.
+     *
+     * @param ctx Planner context.
+     */
+    void init(PlanningContext ctx);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
similarity index 59%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
index 01746a7..9ef2cfb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
@@ -14,22 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
-    /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+/**
+ * Distributed query plan.
+ */
+public class MultiStepQueryPlan extends AbstractMultiStepPlan {
+    /**
+     * @param fieldsMeta Fields metadata.
+     */
+    public MultiStepQueryPlan(QueryTemplate queryTemplate, Object fieldsMeta) {
+        super(queryTemplate, fieldsMeta);
     }
 
-    /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
+    /** {@inheritDoc} */
+    @Override public Type type() {
+        return Type.QUERY;
     }
 
-    /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
+    /** {@inheritDoc} */
+    @Override public QueryPlan copy() {
+        return new MultiStepQueryPlan(queryTemplate, fieldsMetadata);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index ec597bb..8a895a4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -68,6 +68,9 @@ public final class PlanningContext implements Context {
     private final Object[] parameters;
 
     /** */
+    private final long topVer;
+
+    /** */
     private final IgniteTypeFactory typeFactory;
 
     /** */
@@ -91,11 +94,14 @@ public final class PlanningContext implements Context {
         UUID locNodeId,
         UUID originatingNodeId,
         String qry,
-        Object[] parameters) {
+        Object[] parameters,
+        long topVer
+    ) {
         this.locNodeId = locNodeId;
         this.originatingNodeId = originatingNodeId;
         this.qry = qry;
         this.parameters = parameters;
+        this.topVer = topVer;
 
         this.parentCtx = Contexts.chain(parentCtx, cfg.getContext());
         // link frameworkConfig#context() to this.
@@ -141,6 +147,13 @@ public final class PlanningContext implements Context {
         return parameters;
     }
 
+    /**
+     * @return Topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
+    }
+
     // Helper methods
     /**
      * @return Sql operators table.
@@ -276,7 +289,7 @@ public final class PlanningContext implements Context {
     /**
      * Planner context builder.
      */
-    @SuppressWarnings("PublicInnerClass") 
+    @SuppressWarnings("PublicInnerClass")
     public static class Builder {
         /** */
         private static final FrameworkConfig EMPTY_CONFIG =
@@ -303,6 +316,9 @@ public final class PlanningContext implements Context {
         /** */
         private Object[] parameters;
 
+        /** */
+        private long topVer;
+
         /**
          * @param locNodeId Local node ID.
          * @return Builder for chaining.
@@ -359,12 +375,21 @@ public final class PlanningContext implements Context {
         }
 
         /**
+         * @param topVer Topology version.
+         * @return Builder for chaining.
+         */
+        public Builder topologyVersion(long topVer) {
+            this.topVer = topVer;
+            return this;
+        }
+
+        /**
          * Builds planner context.
          *
          * @return Planner context.
          */
         public PlanningContext build() {
-            return new PlanningContext(frameworkCfg, parentCtx, locNodeId, originatingNodeId, qry, parameters);
+            return new PlanningContext(frameworkCfg, parentCtx, locNodeId, originatingNodeId, qry, parameters, topVer);
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
similarity index 65%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
index 01746a7..6f9161b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
@@ -14,22 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
-    /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
-    }
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-    /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
-    }
+/**
+ *
+ */
+public interface QueryPlan {
+    /** Query type */
+    enum Type { QUERY, FRAGMENT, DML, DDL, EXPLAIN }
+
+    /**
+     * @return Query type.
+     */
+    Type type();
 
-    /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
-    }
+    /**
+     * Clones this plan.
+     */
+    QueryPlan copy();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
new file mode 100644
index 0000000..5e2a7c2
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+/** */
+public class QueryTemplate {
+    /** */
+    private final MappingService mappingService;
+
+    /** */
+    private final ImmutableList<Fragment> fragments;
+
+    /** */
+    private final AtomicReference<ExecutionPlan> executionPlan = new AtomicReference<>();
+
+    /** */
+    public QueryTemplate(MappingService mappingService, List<Fragment> fragments) {
+        this.mappingService = mappingService;
+
+        ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+        for (Fragment fragment : fragments)
+            b.add(fragment.detach());
+
+        this.fragments = b.build();
+    }
+
+    /** */
+    public ExecutionPlan map(PlanningContext ctx) {
+        ExecutionPlan executionPlan = this.executionPlan.get();
+        if (executionPlan != null && Objects.equals(executionPlan.topologyVersion(), ctx.topologyVersion()))
+            return executionPlan;
+
+        List<Fragment> fragments = Commons.transform(this.fragments, f -> f.attach(ctx));
+
+        Exception ex = null;
+        RelMetadataQuery mq = first(fragments).root().getCluster().getMetadataQuery();
+        for (int i = 0; i < 3; i++) {
+            try {
+                ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(fragments, ctx, mq));
+
+                if (executionPlan == null || executionPlan.topologyVersion() < executionPlan0.topologyVersion())
+                    this.executionPlan.compareAndSet(executionPlan, executionPlan0);
+
+                return executionPlan0;
+            }
+            catch (FragmentMappingException e) {
+                if (ex == null)
+                    ex = e;
+                else
+                    ex.addSuppressed(e);
+
+                fragments = replace(fragments, e.fragment(), new FragmentSplitter(e.node()).go(e.fragment()));
+            }
+        }
+
+        throw new IgniteException("Failed to map query.", ex);
+    }
+
+    /** */
+    @NotNull private List<Fragment> map(List<Fragment> fragments, PlanningContext ctx, RelMetadataQuery mq) {
+        ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+        for (Fragment fragment : fragments)
+            b.add(fragment.map(mappingService, ctx, mq).detach());
+
+        return b.build();
+    }
+
+    /** */
+    private List<Fragment> replace(List<Fragment> fragments, Fragment fragment, List<Fragment> replacement) {
+        assert !nullOrEmpty(replacement);
+
+        Map<Long, Long> newTargets = new HashMap<>();
+        for (Fragment fragment0 : replacement) {
+            for (IgniteReceiver remote : fragment0.remotes())
+                newTargets.put(remote.exchangeId(), fragment0.fragmentId());
+        }
+
+        List<Fragment> fragments0 = new ArrayList<>(fragments.size() + replacement.size() - 1);
+        for (Fragment fragment0 : fragments) {
+            if (fragment0 == fragment)
+                fragment0 = first(replacement);
+            else if (!fragment0.rootFragment()) {
+                IgniteSender sender = (IgniteSender)fragment0.root();
+                Long newTargetId = newTargets.get(sender.exchangeId());
+
+                if (newTargetId != null) {
+                    sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(),
+                        sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution());
+
+                    fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes());
+                }
+            }
+
+            fragments0.add(fragment0);
+        }
+
+        fragments0.addAll(replacement.subList(1, replacement.size()));
+
+        return fragments0;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
new file mode 100644
index 0000000..ec0dd76
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.SourceAwareIgniteRel;
+
+/**
+ * Splits a query into a list of query fragments.
+ */
+public class Splitter extends IgniteRelShuttle {
+    /** */
+    private final Deque<FragmentProto> stack = new LinkedList<>();
+
+    /** */
+    private FragmentProto curr;
+
+    /** */
+    public List<Fragment> go(IgniteRel root) {
+        ArrayList<Fragment> res = new ArrayList<>();
+
+        stack.push(new FragmentProto(IdGenerator.nextId(), root));
+
+        while (!stack.isEmpty()) {
+            curr = stack.pop();
+
+            curr.root = visit(curr.root);
+
+            res.add(curr.build());
+
+            curr = null;
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteReceiver rel) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteExchange rel) {
+        RelOptCluster cluster = rel.getCluster();
+
+        long targetFragmentId = curr.id;
+        long sourceFragmentId = IdGenerator.nextId();
+        long exchangeId = sourceFragmentId;
+
+        IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(), exchangeId, sourceFragmentId);
+        IgniteSender sender = new IgniteSender(cluster, rel.getTraitSet(), rel.getInput(), exchangeId, targetFragmentId,
+            rel.distribution());
+
+        curr.remotes.add(receiver);
+        stack.push(new FragmentProto(sourceFragmentId, sender));
+
+        return receiver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTrimExchange rel) {
+        return ((SourceAwareIgniteRel)processNode(rel)).clone(IdGenerator.nextId());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexScan rel) {
+        return rel.clone(IdGenerator.nextId());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        return rel.clone(IdGenerator.nextId());
+    }
+
+    /** */
+    private static class FragmentProto {
+        /** */
+        private final long id;
+
+        /** */
+        private IgniteRel root;
+
+        /** */
+        private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
+
+        /** */
+        private FragmentProto(long id, IgniteRel root) {
+            this.id = id;
+            this.root = root;
+        }
+
+        /** */
+        Fragment build() {
+            return new Fragment(id, root, remotes.build());
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteSpool.java
new file mode 100644
index 0000000..6c283c6
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteSpool.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Spool;
+
+/**
+ * Relational operator that returns the contents of a table.
+ */
+public abstract class AbstractIgniteSpool extends Spool implements IgniteRel {
+    /** */
+    public AbstractIgniteSpool(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        Type readType,
+        RelNode input
+    ) {
+        super(cluster, traits, input, readType, Type.EAGER);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("readType", readType.name())
+            .item("writeType", writeType.name());
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java
index 77cd1fd..5e96a71 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java
@@ -40,7 +40,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
  * Relational operator that returns the hashed contents of a table
  * and allow to lookup rows by specified keys.
  */
-public class IgniteHashIndexSpool extends Spool implements IgniteRel {
+public class IgniteHashIndexSpool extends AbstractIgniteSpool implements IgniteRel {
     /** Search row. */
     private final List<RexNode> searchRow;
 
@@ -58,7 +58,7 @@ public class IgniteHashIndexSpool extends Spool implements IgniteRel {
         List<RexNode> searchRow,
         RexNode cond
     ) {
-        super(cluster, traits, input, Type.LAZY, Type.EAGER);
+        super(cluster, traits, Type.LAZY, input);
 
         assert !nullOrEmpty(searchRow);
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index 851d99f..c5432b6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -41,6 +41,7 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -86,8 +87,8 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
             input.getExpression("condition"),
             ImmutableSet.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)),
             input.getEnum("joinType", JoinRelType.class),
-            null,//((RelInputEx)input).getCollation("leftCollation"),
-            null//((RelInputEx)input).getCollation("rightCollation")
+            ((RelInputEx)input).getCollation("leftCollation"),
+            ((RelInputEx)input).getCollation("rightCollation")
         );
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java
index 8dfea74..1d16d58 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java
@@ -39,7 +39,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
  * Relational operator that returns the sorted contents of a table
  * and allow to lookup rows by specified bounds.
  */
-public class IgniteSortedIndexSpool extends Spool implements IgniteRel {
+public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements IgniteRel {
     /** */
     private final RelCollation collation;
 
@@ -58,7 +58,7 @@ public class IgniteSortedIndexSpool extends Spool implements IgniteRel {
         RexNode condition,
         IndexConditions idxCond
     ) {
-        super(cluster, traits, input, Type.LAZY, Type.EAGER);
+        super(cluster, traits, Type.LAZY, input);
 
         assert Objects.nonNull(idxCond);
         assert Objects.nonNull(condition);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
index aa07c78..031fdeb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableFunctionScan;
 import org.apache.calcite.rel.metadata.RelColumnMapping;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 /**
@@ -52,6 +54,15 @@ public class IgniteTableFunctionScan extends TableFunctionScan implements Ignite
         super(cluster, traits, ImmutableList.of(), call, null, rowType, null);
     }
 
+    /**
+     * Constructor used for deserialization.
+     *
+     * @param input Serialized representation.
+     */
+    public IgniteTableFunctionScan(RelInput input) {
+        super(changeTraits(input, IgniteConvention.INSTANCE));
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteTableFunctionScan(cluster, getTraitSet(), getCall(), getRowType());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
index 166c432..db5a054 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
@@ -73,7 +73,7 @@ public class IgniteTableModify extends TableModify implements IgniteRel {
             input.getInput(),
             input.getEnum("operation", Operation.class),
             input.getStringList("updateColumnList"),
-            input.getExpressionList("sourceExpressionList"),
+            input.get("sourceExpressionList") != null ? input.getExpressionList("sourceExpressionList") : null,
             input.getBoolean("flattened", true)
         );
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
index 9c5a266..cdd7907 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -32,7 +33,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteC
 /**
  * Relational operator that returns the contents of a table.
  */
-public class IgniteTableSpool extends Spool implements IgniteRel {
+public class IgniteTableSpool extends AbstractIgniteSpool implements IgniteRel {
     /** */
     public IgniteTableSpool(
         RelOptCluster cluster,
@@ -40,7 +41,7 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
         Spool.Type readType,
         RelNode input
     ) {
-        super(cluster, traits, input, readType, Type.EAGER);
+        super(cluster, traits, readType, input);
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 56c1a5a..9554cb4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -25,6 +25,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -76,6 +78,14 @@ public interface IgniteTable extends TranslatableTable {
     IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName);
 
     /**
+     * Returns nodes mapping.
+     *
+     * @param ctx Planning context.
+     * @return Nodes mapping.
+     */
+    ColocationGroup colocationGroup(PlanningContext ctx);
+
+    /**
      * @return Table distribution.
      */
     IgniteDistribution distribution();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
new file mode 100644
index 0000000..8786074
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import java.lang.reflect.Method;
+
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMappingMetadata;
+
+/**
+ * Contains methods used in metadata definitions.
+ */
+public enum IgniteMethod {
+    /** See {@link FragmentMappingMetadata#fragmentMapping()} */
+    FRAGMENT_MAPPING(FragmentMappingMetadata.class, "fragmentMapping");
+
+    /** */
+    private final Method method;
+
+    /**
+     * @param clazz Class where to lookup method.
+     * @param methodName Method name.
+     * @param argumentTypes Method parameters types.
+     */
+    IgniteMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
+        method = Types.lookupMethod(clazz, methodName, argumentTypes);
+    }
+
+    /**
+     * @return Method.
+     */
+    public Method method() {
+        return method;
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 01403c5..778df9b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.config.CalciteConnectionConfig;
@@ -37,6 +38,7 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelDistribution;
@@ -59,9 +61,14 @@ import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
@@ -79,14 +86,18 @@ import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTr
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.ArrayUtils;
 
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
 import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
 import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /** */
@@ -207,6 +218,8 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
             try {
                 IgniteRel rel = PlannerHelper.optimize(sqlNode, planner);
 
+                checkSplitAndSerialization(rel, publicSchema);
+
 //                System.out.println(RelOptUtil.toString(rel));
 
                 return rel;
@@ -398,6 +411,84 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
     }
 
     /** */
+    protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSchema) {
+        assertNotNull(rel);
+
+        rel = Cloner.clone(rel);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        List<Fragment> fragments = new Splitter().go(rel);
+        List<String> serialized = new ArrayList<>(fragments.size());
+
+        for (Fragment fragment : fragments)
+            serialized.add(toJson(fragment.root()));
+
+        assertNotNull(serialized);
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        List<UUID> nodes = new ArrayList<>(4);
+
+        for (int i = 0; i < 4; i++)
+            nodes.add(UUID.randomUUID());
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(nodes))
+            .originatingNodeId(first(nodes))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .build();
+
+        List<RelNode> deserializedNodes = new ArrayList<>();
+
+        try (IgnitePlanner ignored = ctx.planner()) {
+            for (String s : serialized) {
+                RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+                deserializedNodes.add(reader.read(s));
+            }
+        }
+
+        List<RelNode> expectedRels = fragments.stream()
+            .map(Fragment::root)
+            .collect(Collectors.toList());
+
+        assertEquals(expectedRels.size(), deserializedNodes.size(), "Invalid deserialization fragments count");
+
+        for (int i = 0; i < expectedRels.size(); ++i) {
+            RelNode expected = expectedRels.get(i);
+            RelNode deserialized = deserializedNodes.get(i);
+
+            clearTraits(expected);
+            clearTraits(deserialized);
+
+            if (!expected.deepEquals(deserialized))
+                assertTrue(
+                    expected.deepEquals(deserialized),
+                    "Invalid serialization / deserialization.\n" +
+                        "Expected:\n" + RelOptUtil.toString(expected) +
+                        "Deserialized:\n" + RelOptUtil.toString(deserialized)
+                );
+        }
+    }
+
+    /** */
+    protected void clearTraits(RelNode rel) {
+        IgniteTestUtils.setFieldValue(rel, AbstractRelNode.class, "traitSet", RelTraitSet.createEmpty());
+        rel.getInputs().forEach(this::clearTraits);
+    }
+
+    /** */
     abstract static class TestTable implements IgniteTable {
         /** */
         private final String name;
@@ -531,6 +622,11 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
         @Override public IgniteDistribution distribution() {
             throw new AssertionError();
         }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
new file mode 100644
index 0000000..4fe7ef4
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -0,0 +1,1847 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ *
+ */
+public class PlannerTest extends AbstractPlannerTest {
+    /** */
+    private static List<UUID> NODES;
+    
+    @BeforeAll
+    public static void init() {
+        NODES = new ArrayList<>(4);
+
+        for (int i = 0; i < 4; i++)
+            NODES.add(UUID.randomUUID());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLogicalPlan() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 + 1" +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLogicalPlanDefaultSchema() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCorrelatedQuery() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, (SELECT p.name FROM Project p WHERE p.id = d.id) name, d.projectId " +
+            "FROM Developer d";
+
+        RelTraitDef<?>[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHepPlaner() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public IgniteIndex getIndex(String idxName) {
+                return new IgniteIndex(null, null, null);
+            }
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2),
+                    select(NODES, 2, 0),
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Developer", "hash");
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public IgniteIndex getIndex(String idxName) {
+                return new IgniteIndex(null, null, null);
+            }
+
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2),
+                    select(NODES, 2, 0),
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testVolcanoPlanerDistributed() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Developer", "hash");
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d " +
+            "JOIN (" +
+            "   SELECT pp.id as id0, pp.ver as ver0 " +
+            "   FROM PUBLIC.Project pp" +
+            ") p ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            // Transformation chain
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSplitterColocatedPartitionedPartitioned() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2),
+                    select(NODES, 2, 0),
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Developer", "hash");
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2),
+                    select(NODES, 2, 0),
+                    select(NODES, 0, 1),
+                    select(NODES, 1, 2)));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0";
+
+        RelTraitDef<?>[] traitDefs = {
+            ConventionTraitDef.INSTANCE,
+            DistributionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        assertNotNull(ctx);
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+            new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertEquals(2, plan.fragments().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSplitterColocatedReplicatedReplicated() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 0, 1, 2, 3));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 0, 1, 2, 3));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+            new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertEquals(1, plan.fragments().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 0));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 1, 2),
+                    select(NODES, 2, 3),
+                    select(NODES, 3, 0),
+                    select(NODES, 0, 1)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.id = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+            new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertEquals(3, plan.fragments().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSplitterPartiallyColocated1() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 1, 2, 3));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 0),
+                    select(NODES, 1),
+                    select(NODES, 2)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+            new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertEquals(3, plan.fragments().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSplitterPartiallyColocated2() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 0));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forAssignments(Arrays.asList(
+                    select(NODES, 1),
+                    select(NODES, 2),
+                    select(NODES, 3)
+                ));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+            new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertEquals(3, plan.fragments().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSplitterNonColocated() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 2));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+                return ColocationGroup.forNodes(select(NODES, 0, 1));
+            }
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.ver0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            planner.setDisabledRules(ImmutableSet.of("CorrelatedNestedLoopJoin"));
+
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+        }
+
+        assertNotNull(relRoot);
+
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+            new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+        assertNotNull(plan);
+
+        plan.init(ctx);
+
+        assertNotNull(plan);
+
+        assertEquals(2, plan.fragments().size());
+    }
+
+    /** */
+    @Test
+    public void testSerializationDeserialization() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable developer = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("PROJECTID", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Developer", "hash");
+            }
+        };
+
+        TestTable project = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("VER", f.createJavaType(Integer.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "Project", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("DEVELOPER", developer);
+        publicSchema.addTable("PROJECT", project);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelNode rel;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            rel = planner.rel(sqlNode).rel;
+
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            // Transformation chain
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+        }
+
+        assertNotNull(rel);
+
+        List<Fragment> fragments = new Splitter().go((IgniteRel)rel);
+        List<String> serialized = new ArrayList<>(fragments.size());
+
+        for (Fragment fragment : fragments)
+            serialized.add(toJson(fragment.root()));
+
+        assertNotNull(serialized);
+
+        ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        List<RelNode> nodes = new ArrayList<>();
+
+        try (IgnitePlanner ignored = ctx.planner()) {
+            for (String s : serialized) {
+                RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+                nodes.add(reader.read(s));
+            }
+        }
+
+        assertNotNull(nodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMergeFilters() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable testTbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.single();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", testTbl);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "" +
+            "SELECT val from (\n" +
+            "   SELECT * \n" +
+            "   FROM TEST \n" +
+            "   WHERE VAL = 10) \n" +
+            "WHERE VAL = 10";
+
+        RelTraitDef<?>[] traitDefs = {
+            ConventionTraitDef.INSTANCE,
+            DistributionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .query(sql)
+            .parameters(2)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster()
+                .traitSetOf(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single());
+
+            RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            assertNotNull(phys);
+
+            AtomicInteger filterCnt = new AtomicInteger();
+
+            // Counts filters af the plan.
+            phys.childrenAccept(
+                new RelVisitor() {
+                    @Override public void visit(RelNode node, int ordinal, RelNode parent) {
+                        if (node instanceof IgniteFilter)
+                            filterCnt.incrementAndGet();
+
+                        super.visit(node, ordinal, parent);
+                    }
+                }
+            );
+
+            // Checks that two filter merged into one filter.
+            // Expected plan:
+            // IgniteProject(VAL=[$1])
+            //  IgniteProject(ID=[$0], VAL=[$1])
+            //    IgniteFilter(condition=[=(CAST($1):INTEGER, 10)])
+            //      IgniteTableScan(table=[[PUBLIC, TEST]])
+            assertEquals(0, filterCnt.get());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJoinPushExpressionRule() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable emp = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("DEPTNO", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable dept = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("DEPTNO", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("EMP", emp);
+        publicSchema.addTable("DEPT", dept);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "select d.deptno, e.deptno " +
+            "from dept d, emp e " +
+            "where d.deptno + e.deptno = 2";
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(first(NODES))
+            .originatingNodeId(first(NODES))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .costFactory(new IgniteCostFactory(1, 100, 1, 1))
+                .build())
+            .query(sql)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            assertNotNull(rel);
+            assertEquals("" +
+                    "LogicalFilter(condition=[=(CAST(+($0, $1)):INTEGER, 2)])\n" +
+                    "  LogicalJoin(condition=[true], joinType=[inner])\n" +
+                    "    LogicalProject(DEPTNO=[$0])\n" +
+                    "      IgniteLogicalTableScan(table=[[PUBLIC, DEPT]])\n" +
+                    "    LogicalProject(DEPTNO=[$2])\n" +
+                    "      IgniteLogicalTableScan(table=[[PUBLIC, EMP]])\n",
+                RelOptUtil.toString(rel));
+
+            // Transformation chain
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(CorrelationTrait.UNCORRELATED)
+                .simplify();
+
+            IgniteRel phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            assertNotNull(phys);
+            assertEquals(
+                "IgniteCorrelatedNestedLoopJoin(condition=[=(CAST(+($0, $1)):INTEGER, 2)], joinType=[inner], " +
+                    "correlationVariables=[[$cor1]])\n" +
+                    "  IgniteTableScan(table=[[PUBLIC, DEPT]], requiredColumns=[{0}])\n" +
+                    "  IgniteTableScan(table=[[PUBLIC, EMP]], filters=[=(CAST(+($cor1.DEPTNO, $t0)):INTEGER, 2)], requiredColumns=[{2}])\n",
+                RelOptUtil.toString(phys),
+                "Invalid plan:\n" + RelOptUtil.toString(phys)
+            );
+
+            checkSplitAndSerialization(phys, publicSchema);
+        }
+    }
+
+    /** */
+    @Test
+    public void testMergeJoin() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable emp = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("DEPTNO", f.createJavaType(Integer.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        emp.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "emp_idx", emp));
+
+        TestTable dept = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("DEPTNO", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        dept.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "dep_idx", dept));
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("EMP", emp);
+        publicSchema.addTable("DEPT", dept);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name = d.name order by e.name, d.deptno";
+
+        IgniteRel phys = physicalPlan(sql, publicSchema);
+
+        assertNotNull(phys);
+        assertEquals("" +
+                "IgniteMergeJoin(condition=[AND(=($0, $4), =($3, $1))], joinType=[inner], leftCollation=[[0, 1]], " +
+                "rightCollation=[[2, 1]])\n" +
+                "  IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n" +
+                "  IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n",
+            RelOptUtil.toString(phys));
+
+        checkSplitAndSerialization(phys, publicSchema);
+    }
+
+    /** */
+    @Test
+    public void testMergeJoinIsNotAppliedForNonEquiJoin() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable emp = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .add("DEPTNO", f.createJavaType(Integer.class))
+                .build(), RewindabilityTrait.REWINDABLE, 1000) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        emp.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "emp_idx", emp));
+
+        TestTable dept = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("DEPTNO", f.createJavaType(Integer.class))
+                .add("NAME", f.createJavaType(String.class))
+                .build(), RewindabilityTrait.REWINDABLE, 100) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        dept.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "dep_idx", dept));
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("EMP", emp);
+        publicSchema.addTable("DEPT", dept);
+
+        String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name >= d.name order by e.name, d.deptno";
+
+        RelNode phys = physicalPlan(sql, publicSchema, "CorrelatedNestedLoopJoin");
+
+        assertNotNull(phys);
+        assertEquals("" +
+                "IgniteProject(DEPTNO=[$3], NAME=[$4], ID=[$0], NAME0=[$1], DEPTNO0=[$2])\n" +
+                "  IgniteSort(sort0=[$1], sort1=[$3], dir0=[ASC], dir1=[ASC])\n" +
+                "    IgniteNestedLoopJoin(condition=[AND(=($3, $2), >=($1, $4))], joinType=[inner])\n" +
+                "      IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n" +
+                "      IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n",
+            RelOptUtil.toString(phys));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLimit() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable testTbl = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build()) {
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("TEST", testTbl);
+
+        String sql = "SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
+
+        {
+            IgniteRel phys = physicalPlan(sql, publicSchema);
+
+            assertNotNull(phys);
+
+            AtomicInteger limit = new AtomicInteger();
+            AtomicBoolean sort = new AtomicBoolean();
+
+            relTreeVisit(phys, (node, ordinal, parent) -> {
+                    if (node instanceof IgniteLimit)
+                        limit.incrementAndGet();
+
+                    if (node instanceof IgniteSort)
+                        sort.set(true);
+                }
+            );
+
+            String errMsg = "Invalid plan: \n" + RelOptUtil.toString(phys);
+
+            assertEquals(1, limit.get(), errMsg);
+            assertFalse(sort.get(), errMsg);
+
+            checkSplitAndSerialization(phys, publicSchema);
+        }
+
+        sql = "SELECT * FROM TEST ORDER BY ID OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
+
+        {
+            IgniteRel phys = physicalPlan(sql, publicSchema);
+
+            assertNotNull(phys);
+
+            AtomicInteger limit = new AtomicInteger();
+            AtomicBoolean sort = new AtomicBoolean();
+
+            relTreeVisit(phys, (node, ordinal, parent) -> {
+                    if (node instanceof IgniteLimit)
+                        limit.incrementAndGet();
+
+                    if (node instanceof IgniteSort)
+                        sort.set(true);
+                }
+            );
+
+            String errMsg = "Invalid plan: \n" + RelOptUtil.toString(phys);
+
+            assertEquals(1, limit.get(), errMsg);
+            assertTrue(sort.get(), errMsg);
+
+            checkSplitAndSerialization(phys, publicSchema);
+        }
+    }
+
+    /** */
+    @Test
+    public void testNotStandardFunctions() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "TEST",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "TEST", "hash");
+                }
+            }
+        );
+
+        String[] queries = {
+            "select REVERSE(val) from TEST", // MYSQL
+            "select TO_DATE(val, 'yyyymmdd') from TEST" // ORACLE
+        };
+
+        for (String sql : queries) {
+            IgniteRel phys = physicalPlan(
+                sql,
+                publicSchema
+            );
+
+            checkSplitAndSerialization(phys, publicSchema);
+        }
+    }
+
+    /** */
+    private List<UUID> intermediateMapping(long topVer, boolean single,
+        @Nullable Predicate<ClusterNode> filter) {
+        return single ? select(NODES, 0) : select(NODES, 0, 1, 2, 3);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index e37f524..7b1fdf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -194,6 +194,62 @@ public final class ArrayUtils {
     }
 
     /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(byte[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(short[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(int[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(long[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(float[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(double[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
+     * @param arr Array to check.
+     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+     */
+    public static boolean nullOrEmpty(boolean[] arr) {
+        return arr == null || arr.length == 0;
+    }
+
+    /**
      * Converts array to {@link List}. Note that resulting list cannot
      * be altered in size, as it it based on the passed in array -
      * only current elements can be changed.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntIterator.java
similarity index 65%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntIterator.java
index 01746a7..5637411 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntIterator.java
@@ -14,22 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
 
-/** Stubs */
-public class Stubs {
-    /** */
-    public static int intFoo(Object... args) {
-        return args == null ? 0 : args.length;
-    }
+package org.apache.ignite.internal.util;
 
-    /** */
-    public static boolean boolFoo(Object... args) {
-        return args == null;
-    }
+/**
+ * Iterator over integer primitives.
+ */
+public interface IgniteIntIterator {
+    /**
+     * @return {@code true} if the iteration has more elements.
+     */
+    public boolean hasNext();
 
-    /** */
-    public static String stringFoo(Object... args) {
-        return args == null ? "null" : "not null";
-    }
+    /**
+     * @return Next int.
+     */
+    public int next();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntList.java
new file mode 100644
index 0000000..bf5882a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntList.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+
+/**
+ * Minimal list API to work with primitive ints. This list exists
+ * to avoid boxing/unboxing when using standard list from Java.
+ */
+public class IgniteIntList implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int[] arr;
+
+    /** */
+    private int idx;
+
+    /**
+     *
+     */
+    public IgniteIntList() {
+        // No-op.
+    }
+
+    /**
+     * @param size Size.
+     */
+    public IgniteIntList(int size) {
+        arr = new int[size];
+        // idx = 0
+    }
+
+    /**
+     * @param arr Array.
+     */
+    public IgniteIntList(int[] arr) {
+        this.arr = arr;
+
+        idx = arr.length;
+    }
+
+    /**
+     * @param vals Values.
+     * @return List from values.
+     */
+    public static IgniteIntList asList(int... vals) {
+        if (nullOrEmpty(vals))
+            return new IgniteIntList();
+
+        return new IgniteIntList(vals);
+    }
+
+    /**
+     * @param arr Array.
+     * @param size Size.
+     */
+    private IgniteIntList(int[] arr, int size) {
+        this.arr = arr;
+        idx = size;
+    }
+
+    /**
+     * @return Copy of this list.
+     */
+    public IgniteIntList copy() {
+        if (idx == 0)
+            return new IgniteIntList();
+
+        return new IgniteIntList(Arrays.copyOf(arr, idx));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof IgniteIntList))
+            return false;
+
+        IgniteIntList that = (IgniteIntList)o;
+
+        if (idx != that.idx)
+            return false;
+
+        if (idx == 0 || arr == that.arr)
+            return true;
+
+        for (int i = 0; i < idx; i++) {
+            if (arr[i] != that.arr[i])
+                return false;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = 1;
+
+        for (int i = 0; i < idx; i++) {
+            int element = arr[i];
+            res = 31 * res + element;
+        }
+
+        return res;
+    }
+
+    /**
+     * @param l List to add all elements of.
+     */
+    public void addAll(IgniteIntList l) {
+        assert l != null;
+
+        if (l.isEmpty())
+            return;
+
+        if (arr == null)
+            arr = new int[4];
+
+        int len = arr.length;
+
+        while (len < idx + l.size())
+            len <<= 1;
+
+        arr = Arrays.copyOf(arr, len);
+
+        System.arraycopy(l.arr, 0, arr, idx, l.size());
+
+        idx += l.size();
+    }
+
+    /**
+     * Add element to this array.
+     * @param x Value.
+     */
+    public void add(int x) {
+        if (arr == null)
+            arr = new int[4];
+        else if (arr.length == idx)
+            arr = Arrays.copyOf(arr, arr.length << 1);
+
+        arr[idx++] = x;
+    }
+
+    /**
+     * Clears the list.
+     */
+    public void clear() {
+        idx = 0;
+    }
+
+    /**
+     * Gets the last element.
+     *
+     * @return The last element.
+     */
+    public int last() {
+        return arr[idx - 1];
+    }
+
+    /**
+     * Removes and returns the last element of the list. Complementary method to {@link #add(int)} for stack like usage.
+     *
+     * @return Removed element.
+     * @throws NoSuchElementException If the list is empty.
+     */
+    public int remove() throws NoSuchElementException {
+        if (idx == 0)
+            throw new NoSuchElementException();
+
+        return arr[--idx];
+    }
+
+    /**
+     * Returns (possibly reordered) copy of this list, excluding all elements of given list.
+     *
+     * @param l List of elements to remove.
+     * @return New list without all elements from {@code l}.
+     */
+    public IgniteIntList copyWithout(IgniteIntList l) {
+        assert l != null;
+
+        if (idx == 0)
+            return new IgniteIntList();
+
+        if (l.idx == 0)
+            return new IgniteIntList(Arrays.copyOf(arr, idx));
+
+        int[] newArr = Arrays.copyOf(arr, idx);
+        int newIdx = idx;
+
+        for (int i = 0; i < l.size(); i++) {
+            int rmVal = l.get(i);
+
+            for (int j = 0; j < newIdx; j++) {
+                if (newArr[j] == rmVal) {
+
+                    while (newIdx > 0 && newArr[newIdx - 1] == rmVal)
+                        newIdx--;
+
+                    if (newIdx > 0) {
+                        newArr[j] = newArr[newIdx - 1];
+                        newIdx--;
+                    }
+                }
+            }
+        }
+
+        return new IgniteIntList(newArr, newIdx);
+    }
+
+    /**
+     * @param i Index.
+     * @return Value.
+     */
+    public int get(int i) {
+        assert i < idx;
+
+        return arr[i];
+    }
+
+    /**
+     * @return Size.
+     */
+    public int size() {
+        return idx;
+    }
+
+    /**
+     * @return {@code True} if this list has no elements.
+     */
+    public boolean isEmpty() {
+        return idx == 0;
+    }
+
+    /**
+     * @param l Element to find.
+     * @return {@code True} if found.
+     */
+    public boolean contains(int l) {
+        for (int i = 0; i < idx; i++) {
+            if (arr[i] == l)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param l List to check.
+     * @return {@code True} if this list contains all the elements of passed in list.
+     */
+    public boolean containsAll(IgniteIntList l) {
+        for (int i = 0; i < l.size(); i++) {
+            if (!contains(l.get(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @return {@code True} if there are no duplicates.
+     */
+    public boolean distinct() {
+        for (int i = 0; i < idx; i++) {
+            for (int j = i + 1; j < idx; j++) {
+                if (arr[i] == arr[j])
+                    return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @param size New size.
+     * @param last If {@code true} the last elements will be removed, otherwise the first.
+     */
+    public void truncate(int size, boolean last) {
+        assert size >= 0 && size <= idx;
+
+        if (size == idx)
+            return;
+
+        if (!last && idx != 0 && size != 0)
+            System.arraycopy(arr, idx - size, arr, 0, size);
+
+        idx = size;
+    }
+
+    /**
+     * Removes element by given index.
+     *
+     * @param i Index.
+     * @return Removed value.
+     */
+    public int removeIndex(int i) {
+        assert i < idx : i;
+
+        int res = arr[i];
+
+        if (i == idx - 1) { // Last element.
+            idx = i;
+        }
+        else {
+            System.arraycopy(arr, i + 1, arr, i, idx - i - 1);
+            idx--;
+        }
+
+        return res;
+    }
+
+    /**
+     * Removes value from this list.
+     *
+     * @param startIdx Index to begin search with.
+     * @param val Value.
+     * @return Index of removed value if the value was found and removed or {@code -1} otherwise.
+     */
+    public int removeValue(int startIdx, int val) {
+        assert startIdx >= 0;
+
+        for (int i = startIdx; i < idx; i++) {
+            if (arr[i] == val) {
+                removeIndex(i);
+
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Removes value from this list.
+     *
+     * @param startIdx Index to begin search with.
+     * @param oldVal Old value.
+     * @param newVal New value.
+     * @return Index of replaced value if the value was found and replaced or {@code -1} otherwise.
+     */
+    public int replaceValue(int startIdx, int oldVal, int newVal) {
+        for (int i = startIdx; i < idx; i++) {
+            if (arr[i] == oldVal) {
+                arr[i] = newVal;
+
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * @return Array copy.
+     */
+    public int[] array() {
+        int[] res = new int[idx];
+
+        System.arraycopy(arr, 0, res, 0, idx);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(idx);
+
+        for (int i = 0; i < idx; i++)
+            out.writeInt(arr[i]);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        idx = in.readInt();
+
+        arr = new int[idx];
+
+        for (int i = 0; i < idx; i++)
+            arr[i] = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        StringBuilder b = new StringBuilder("[");
+
+        for (int i = 0; i < idx; i++) {
+            if (i != 0)
+                b.append(',');
+
+            b.append(arr[i]);
+        }
+
+        b.append(']');
+
+        return b.toString();
+    }
+
+    /**
+     * @param in Input to read list from.
+     * @return Grid int list.
+     * @throws IOException If failed.
+     */
+    @Nullable public static IgniteIntList readFrom(DataInput in) throws IOException {
+        int idx = in.readInt();
+
+        if (idx == -1)
+            return null;
+
+        int[] arr = new int[idx];
+
+        for (int i = 0; i < idx; i++)
+            arr[i] = in.readInt();
+
+        return new IgniteIntList(arr);
+    }
+
+    /**
+     * @param out Output to write to.
+     * @param list List.
+     * @throws IOException If failed.
+     */
+    public static void writeTo(DataOutput out, @Nullable IgniteIntList list) throws IOException {
+        out.writeInt(list != null ? list.idx : -1);
+
+        if (list != null) {
+            for (int i = 0; i < list.idx; i++)
+                out.writeInt(list.arr[i]);
+        }
+    }
+
+    /**
+     * @param to To list.
+     * @param from From list.
+     * @return To list (passed in or created).
+     */
+    public static IgniteIntList addAll(@Nullable IgniteIntList to, IgniteIntList from) {
+        if (to == null) {
+            IgniteIntList res = new IgniteIntList(from.size());
+
+            res.addAll(from);
+
+            return res;
+        }
+        else {
+            to.addAll(from);
+
+            return to;
+        }
+    }
+
+    /**
+     * Sorts this list.
+     * Use {@code copy().sort()} if you need a defensive copy.
+     *
+     * @return {@code this} For chaining.
+     */
+    public IgniteIntList sort() {
+        if (idx > 1)
+            Arrays.sort(arr, 0, idx);
+
+        return this;
+    }
+
+    /**
+     * Removes given number of elements from the end. If the given number of elements is higher than
+     * list size, then list will be cleared.
+     *
+     * @param cnt Count to pop from the end.
+     */
+    public void pop(int cnt) {
+        assert cnt >= 0 : cnt;
+
+        if (idx < cnt)
+            idx = 0;
+        else
+            idx -= cnt;
+    }
+
+    /**
+     * @return Iterator.
+     */
+    public IgniteIntIterator iterator() {
+        return new IgniteIntIterator() {
+            int c = 0;
+
+            @Override public boolean hasNext() {
+                return c < idx;
+            }
+
+            @Override public int next() {
+                return arr[c++];
+            }
+        };
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b54cf3f..24cc6c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,10 @@ package org.apache.ignite.internal.util;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
 
 import org.jetbrains.annotations.Nullable;
 
@@ -35,11 +39,28 @@ public class IgniteUtils {
     /** Class loader used to load Ignite. */
     private static final ClassLoader igniteClassLoader = IgniteUtils.class.getClassLoader();
 
+    /** Primitive class map. */
+    private static final Map<String, Class<?>> primitiveMap = new HashMap<>(16, .5f);
+
+    /** */
+    private static final ConcurrentMap<ClassLoader, ConcurrentMap<String, Class>> classCache =
+        new ConcurrentHashMap<>();
+
     /*
       Initializes enterprise check.
      */
     static {
         IgniteUtils.jdkVer = System.getProperty("java.specification.version");
+
+        primitiveMap.put("byte", byte.class);
+        primitiveMap.put("short", short.class);
+        primitiveMap.put("int", int.class);
+        primitiveMap.put("long", long.class);
+        primitiveMap.put("float", float.class);
+        primitiveMap.put("double", double.class);
+        primitiveMap.put("char", char.class);
+        primitiveMap.put("boolean", boolean.class);
+        primitiveMap.put("void", void.class);
     }
 
     /**
@@ -253,4 +274,65 @@ public class IgniteUtils {
     public static ClassLoader igniteClassLoader() {
         return igniteClassLoader;
     }
+
+    /**
+     * Gets class for provided name. Accepts primitive types names.
+     *
+     * @param clsName Class name.
+     * @param ldr Class loader.
+     * @return Class.
+     * @throws ClassNotFoundException If class not found.
+     */
+    public static Class<?> forName(String clsName, @Nullable ClassLoader ldr) throws ClassNotFoundException {
+        return forName(clsName, ldr, null);
+    }
+
+    /**
+     * Gets class for provided name. Accepts primitive types names.
+     *
+     * @param clsName Class name.
+     * @param ldr Class loader.
+     * @return Class.
+     * @throws ClassNotFoundException If class not found.
+     */
+    public static Class<?> forName(
+        String clsName,
+        @Nullable ClassLoader ldr,
+        Predicate<String> clsFilter
+    ) throws ClassNotFoundException {
+        assert clsName != null;
+
+        Class<?> cls = primitiveMap.get(clsName);
+
+        if (cls != null)
+            return cls;
+
+        if (ldr == null)
+            ldr = igniteClassLoader;
+
+        ConcurrentMap<String, Class> ldrMap = classCache.get(ldr);
+
+        if (ldrMap == null) {
+            ConcurrentMap<String, Class> old = classCache.putIfAbsent(ldr, ldrMap = new ConcurrentHashMap<>());
+
+            if (old != null)
+                ldrMap = old;
+        }
+
+        cls = ldrMap.get(clsName);
+
+        if (cls == null) {
+            if (clsFilter != null && !clsFilter.test(clsName))
+                throw new ClassNotFoundException("Deserialization of class " + clsName + " is disallowed.");
+
+            cls = Class.forName(clsName, true, ldr);
+
+            Class old = ldrMap.putIfAbsent(clsName, cls);
+
+            if (old != null)
+                cls = old;
+        }
+
+        return cls;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
new file mode 100644
index 0000000..5a43f2c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Utility class for tests.
+ */
+public final class IgniteTestUtils {
+    /**
+     * Set object field value via reflection.
+     *
+     * @param obj Object to set field value to.
+     * @param fieldName Field name to set value for.
+     * @param val New field value.
+     * @throws IgniteInternalException In case of error.
+     */
+    public static void setFieldValue(Object obj, String fieldName, Object val) throws IgniteInternalException {
+        assert obj != null;
+        assert fieldName != null;
+
+        try {
+            Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass();
+
+            Field field = cls.getDeclaredField(fieldName);
+
+            boolean isFinal = (field.getModifiers() & Modifier.FINAL) != 0;
+
+            boolean isStatic = (field.getModifiers() & Modifier.STATIC) != 0;
+
+            /**
+             * http://java.sun.com/docs/books/jls/third_edition/html/memory.html#17.5.3
+             * If a final field is initialized to a compile-time constant in the field declaration,
+             *   changes to the final field may not be observed.
+             */
+            if (isFinal && isStatic)
+                throw new IgniteInternalException("Modification of static final field through reflection.");
+
+            boolean accessible = field.isAccessible();
+
+            if (!accessible)
+                field.setAccessible(true);
+
+            field.set(obj, val);
+        }
+        catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new IgniteInternalException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e);
+        }
+    }
+
+    /**
+     * Set object field value via reflection.
+     *
+     * @param obj Object to set field value to.
+     * @param cls Class to get field from.
+     * @param fieldName Field name to set value for.
+     * @param val New field value.
+     * @throws IgniteInternalException In case of error.
+     */
+    public static void setFieldValue(Object obj, Class cls, String fieldName, Object val) throws IgniteInternalException {
+        assert fieldName != null;
+
+        try {
+            Field field = cls.getDeclaredField(fieldName);
+
+            boolean accessible = field.isAccessible();
+
+            if (!accessible)
+                field.setAccessible(true);
+
+            boolean isFinal = (field.getModifiers() & Modifier.FINAL) != 0;
+
+            boolean isStatic = (field.getModifiers() & Modifier.STATIC) != 0;
+
+            /**
+             * http://java.sun.com/docs/books/jls/third_edition/html/memory.html#17.5.3
+             * If a final field is initialized to a compile-time constant in the field declaration,
+             *   changes to the final field may not be observed.
+             */
+            if (isFinal && isStatic)
+                throw new IgniteInternalException("Modification of static final field through reflection.");
+
+            if (isFinal) {
+                Field modifiersField = Field.class.getDeclaredField("modifiers");
+
+                modifiersField.setAccessible(true);
+
+                modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+            }
+
+            field.set(obj, val);
+        }
+        catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new IgniteInternalException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e);
+        }
+    }
+}