You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/09/29 12:57:46 UTC

[ignite] branch ignite-12248 updated: query cache improvements

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 54a02e1  query cache improvements
54a02e1 is described below

commit 54a02e1976e26bd3e3ab54c084a9fc8ae61e45e6
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Sep 29 15:49:44 2020 +0300

    query cache improvements
---
 .../query/calcite/exec/ExecutionServiceImpl.java   | 160 ++++++++++-----------
 .../calcite/prepare/AbstractMultiStepPlan.java     |  62 +-------
 .../processors/query/calcite/prepare/Cloner.java   |  65 +++------
 .../query/calcite/prepare/ExplainPlan.java         |   5 +-
 .../processors/query/calcite/prepare/Fragment.java |   2 +-
 .../prepare/{QueryPlan.java => FragmentPlan.java}  |  40 +++---
 .../query/calcite/prepare/FragmentSplitter.java    | 111 +-------------
 .../{Splitter.java => IgniteRelShuttle.java}       | 103 +++----------
 .../query/calcite/prepare/MultiStepDmlPlan.java    |  13 +-
 .../query/calcite/prepare/MultiStepQueryPlan.java  |  13 +-
 ...stractMultiStepPlan.java => QueryMappings.java} | 101 +++++--------
 .../query/calcite/prepare/QueryPlan.java           |   6 +-
 .../processors/query/calcite/prepare/Splitter.java | 120 +---------------
 13 files changed, 210 insertions(+), 591 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 6e3c0a7..59d6d49 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitDef;
@@ -83,6 +85,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryF
 import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription;
+import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
@@ -366,9 +369,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         String qry,
         Object[] params
     ) {
-        PlanningContext pctx = createContext(ctx, schema, qry, params);
+        PlanningContext pctx = createContext(Commons.convert(ctx), topologyVersion(), localNodeId(), schema, qry, params);
 
-        List<QueryPlan> qryPlans = prepareQueryPlan(pctx);
+        List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareQuery);
 
         return executePlans(qryPlans, pctx);
     }
@@ -453,12 +456,8 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private PlanningContext createContext(
-        @Nullable QueryContext qryCtx,
-        @Nullable String schemaName,
-        String qry,
-        Object[] params
-    ) {
+    private PlanningContext createContext(Context parent, AffinityTopologyVersion topVer, UUID originator,
+        @Nullable String schema, String qry, Object[] params) {
         RelTraitDef<?>[] traitDefs = {
             ConventionTraitDef.INSTANCE,
             RelCollationTraitDef.INSTANCE,
@@ -468,56 +467,23 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
 
         return PlanningContext.builder()
             .localNodeId(localNodeId())
-            .parentContext(Commons.convert(qryCtx))
+            .originatingNodeId(originator)
+            .parentContext(parent)
             .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                .defaultSchema(schemaName != null
-                    ? schemaHolder().schema().getSubSchema(schemaName)
+                .defaultSchema(schema != null
+                    ? schemaHolder().schema().getSubSchema(schema)
                     : schemaHolder().schema())
                 .traitDefs(traitDefs)
                 .build())
             .query(qry)
             .parameters(params)
-            .topologyVersion(topologyVersion())
-            .logger(log)
-            .build();
-    }
-
-    /** */
-    private PlanningContext createContext(
-        @Nullable String schemaName,
-        UUID originatingNodeId,
-        AffinityTopologyVersion topVer
-    ) {
-        // TODO pass to context user locale and timezone.
-
-        RelTraitDef<?>[] traitDefs = {
-            ConventionTraitDef.INSTANCE,
-            RelCollationTraitDef.INSTANCE,
-            DistributionTraitDef.INSTANCE,
-            RewindabilityTraitDef.INSTANCE
-        };
-
-        return PlanningContext.builder()
-            .localNodeId(localNodeId())
-            .originatingNodeId(originatingNodeId)
-            .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                .defaultSchema(schemaName != null
-                    ? schemaHolder().schema().getSubSchema(schemaName)
-                    : schemaHolder().schema())
-                .traitDefs(traitDefs)
-                .build())
             .topologyVersion(topVer)
             .logger(log)
             .build();
     }
 
     /** */
-    private List<QueryPlan> prepareQueryPlan(PlanningContext ctx) {
-        return queryPlanCache().queryPlan(ctx, new CacheKey(ctx.schemaName(), ctx.query()), this::prepare0);
-    }
-
-    /** */
-    private List<QueryPlan> prepare0(PlanningContext ctx) {
+    private List<QueryPlan> prepareQuery(PlanningContext ctx) {
         try {
             String qry = ctx.query();
 
@@ -552,6 +518,11 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
+    private List<QueryPlan> prepareFragment(PlanningContext ctx) {
+        return ImmutableList.of(new FragmentPlan(fromJson(ctx, ctx.query())));
+    }
+
+    /** */
     private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
         assert single(sqlNode);
 
@@ -741,7 +712,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
                         QueryStartRequest req = new QueryStartRequest(
                             qryId,
                             pctx.schemaName(),
-                            fragment0.rootSerialized(),
+                            fragment0.serialized(),
                             pctx.topologyVersion(),
                             fragmentDesc0,
                             pctx.parameters());
@@ -771,6 +742,56 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
+    private void executeFragment(UUID qryId, FragmentPlan plan, PlanningContext pctx, FragmentDescription fragmentDesc) {
+        ExecutionContext<Row> ectx = new ExecutionContext<>(taskExecutor(), pctx, qryId,
+            fragmentDesc, handler, Commons.parametersMap(pctx.parameters()));
+
+        long frId = fragmentDesc.fragmentId();
+        UUID origNodeId = pctx.originatingNodeId();
+
+        Outbox<Row> node;
+        try {
+            node = new LogicalRelImplementor<>(
+                ectx,
+                partitionService(),
+                mailboxRegistry(),
+                exchangeService(),
+                failureProcessor())
+                .go(plan.root());
+        }
+        catch (Exception ex) {
+            U.error(log, "Failed to build execution tree. ", ex);
+
+            mailboxRegistry.outboxes(qryId, frId, -1)
+                .forEach(Outbox::close);
+            mailboxRegistry.inboxes(qryId, frId, -1)
+                .forEach(Inbox::close);
+
+            try {
+                messageService().send(origNodeId, new QueryStartResponse(qryId, frId, ex));
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
+            }
+
+            return;
+        }
+
+        try {
+            messageService().send(origNodeId, new QueryStartResponse(qryId, frId));
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e);
+
+            node.onNodeLeft(origNodeId);
+
+            return;
+        }
+
+        node.init();
+    }
+
+    /** */
     private void register(QueryInfo info) {
         UUID qryId = info.ctx.queryId();
         PlanningContext pctx = info.ctx.planningContext();
@@ -832,50 +853,15 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     private void onMessage(UUID nodeId, QueryStartRequest msg) {
         assert nodeId != null && msg != null;
 
-        PlanningContext ctx = createContext(msg.schema(), nodeId, msg.topologyVersion());
-        ExecutionContext<Row> execCtx = new ExecutionContext<>(taskExecutor(), ctx, msg.queryId(),
-            msg.fragmentDescription(), handler, Commons.parametersMap(msg.parameters()));
-
-        Outbox<Row> node;
-        try {
-            node = new LogicalRelImplementor<>(
-                execCtx,
-                partitionService(),
-                mailboxRegistry(),
-                exchangeService(),
-                failureProcessor())
-                .go(fromJson(ctx, msg.root()));
-        }
-        catch (Exception ex) {
-            U.error(log, "Failed to build execution tree. ", ex);
-
-            mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
-                .forEach(Outbox::close);
-            mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
-                .forEach(Inbox::close);
+        PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters());
 
-            try {
-                messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
-            }
+        List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareFragment);
 
-            return;
-        }
+        assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT;
 
-        try {
-            messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId()));
-        }
-        catch (IgniteCheckedException e) {
-            U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
+        FragmentPlan plan = (FragmentPlan)qryPlans.get(0);
 
-            node.onNodeLeft(nodeId);
-
-            return;
-        }
-
-        node.init();
+        executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription());
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 47aa7c8..f146e39 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -21,13 +21,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.util.typedef.F;
@@ -44,12 +40,16 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
     protected final List<GridQueryFieldMetadata> fieldsMeta;
 
     /** */
+    protected final QueryMappings queryMappings;
+
+    /** */
     protected Map<Long, NodesMapping> mappings;
 
     /** */
-    protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
+    protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings queryMappings) {
         this.fragments = fragments;
         this.fieldsMeta = fieldsMeta;
+        this.queryMappings = queryMappings;
     }
 
     /** {@inheritDoc} */
@@ -92,61 +92,11 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
 
     /** {@inheritDoc} */
     @Override public void init(MappingService mappingService, PlanningContext ctx) {
-        mappings = U.newHashMap(fragments.size());
-
-        RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery();
-
-        for (int i = 0, j = 0; i < fragments.size();) {
-            Fragment fragment = fragments.get(i);
-
-            try {
-                mappings.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq));
-
-                i++;
-            }
-            catch (OptimisticPlanningException e) {
-                if (++j > 3)
-                    throw new IgniteSQLException("Failed to map query.", e);
-
-                replace(fragment, new FragmentSplitter(e.node()).go(fragment));
-
-                // restart init routine.
-                mappings.clear();
-                i = 0;
-            }
-        }
+        mappings = queryMappings.map(mappingService, ctx, fragments);
     }
 
     /** */
     private NodesMapping fragmentMapping(long fragmentId) {
         return mappings == null ? null : mappings.get(fragmentId);
     }
-
-    /** */
-    private void replace(Fragment fragment, List<Fragment> replacement) {
-        assert !F.isEmpty(replacement);
-
-        Map<Long, Long> newTargets = new HashMap<>();
-
-        for (Fragment fragment0 : replacement) {
-            for (IgniteReceiver remote : fragment0.remotes())
-                newTargets.put(remote.exchangeId(), fragment0.fragmentId());
-        }
-
-        for (int i = 0; i < fragments.size(); i++) {
-            Fragment fragment0 = fragments.get(i);
-
-            if (fragment0 == fragment)
-                fragments.set(i, F.first(replacement));
-            else if (!fragment0.local()) {
-                IgniteSender sender = (IgniteSender)fragment0.root();
-                Long newTargetId = newTargets.get(sender.exchangeId());
-
-                if (newTargetId != null)
-                    sender.targetFragmentId(newTargetId);
-            }
-        }
-
-        fragments.addAll(replacement.subList(1, replacement.size()));
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 9e22c47..aeee7f5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.util.ArrayList;
 import java.util.List;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
@@ -42,7 +41,6 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchang
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
 
 /** */
 class Cloner implements IgniteRelVisitor<IgniteRel> {
@@ -50,7 +48,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     private final RelOptCluster cluster;
 
     /** */
-    private FragmentProto curr;
+    private ImmutableList.Builder<IgniteReceiver> remotes;
 
     Cloner(RelOptCluster cluster) {
         this.cluster = cluster;
@@ -59,22 +57,31 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     /**
      * Clones and associates a plan with a new cluster.
      *
-     * @param src Plan to clone.
+     * @param src Fragments to clone.
      * @return New plan.
      */
-    List<Fragment> go(List<Fragment> src) {
-        assert !F.isEmpty(src);
+    public List<Fragment> go(List<Fragment> src) {
+        return Commons.transform(src, this::go);
+    }
+
+    /**
+     * Clones and associates a plan with a new cluster.
+     *
+     * @param src Fragment to clone.
+     * @return New plan.
+     */
+    public Fragment go(Fragment src) {
+        try {
+            remotes = ImmutableList.builder();
 
-        List<Fragment> fragments = new ArrayList<>(src.size());
+            IgniteRel newRoot = visit(src.root());
+            ImmutableList<IgniteReceiver> remotes = this.remotes.build();
 
-        for (Fragment fragment : src) {
-            curr = new FragmentProto(fragment.fragmentId(), fragment.root(), fragment.rootSerialized());
-            curr.root = visit(curr.root);
-            fragments.add(curr.build());
-            curr = null;
+            return new Fragment(src.fragmentId(), newRoot, remotes, src.serialized());
+        }
+        finally {
+            remotes = null;
         }
-
-        return fragments;
     }
 
     /** {@inheritDoc} */
@@ -163,7 +170,8 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
         IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(),
             rel.exchangeId(), rel.sourceFragmentId());
 
-        curr.remotes.add(receiver);
+        if (remotes != null)
+            remotes.add(receiver);
 
         return receiver;
     }
@@ -203,31 +211,4 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     @Override public IgniteRel visit(IgniteRel rel) {
         return rel.accept(this);
     }
-
-    /** */
-    private static class FragmentProto {
-        /** */
-        private final long id;
-
-        /** */
-        private IgniteRel root;
-
-        /** Serialized representation. */
-        private String rootSer;
-
-        /** */
-        private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
-
-        /** */
-        private FragmentProto(long id, IgniteRel root, String rootSer) {
-            this.id = id;
-            this.root = root;
-            this.rootSer = rootSer;
-        }
-
-        /** */
-        Fragment build() {
-            return new Fragment(id, root, remotes.build(), rootSer);
-        }
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java
index 6d0e3cb..b96a9ca 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java
@@ -17,9 +17,8 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
-
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Query explain plan.
@@ -46,7 +45,7 @@ public class ExplainPlan implements QueryPlan {
     }
 
     /** {@inheritDoc} */
-    @Override public QueryPlan clone(@Nullable PlanningContext ctx) {
+    @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
         return this;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 98db988..04b57e2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -128,7 +128,7 @@ public class Fragment {
      *
      * @return Serialized form.
      */
-    public String rootSerialized() {
+    public String serialized() {
         return rootSer != null ? rootSer : (rootSer = toJson(root()));
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
similarity index 57%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
index 7f38050..dd28f96 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java
@@ -17,23 +17,31 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.jetbrains.annotations.NotNull;
 
-/**
- *
- */
-public interface QueryPlan {
-    /** Query type */
-    enum Type { QUERY, DML, DDL, EXPLAIN }
+/** */
+public class FragmentPlan implements QueryPlan {
+    /** */
+    private final IgniteRel root;
+
+    /** */
+    public FragmentPlan(IgniteRel root) {
+        this.root = root;
+    }
+
+    /** */
+    public IgniteRel root() {
+        return root;
+    }
 
-    /**
-     * @return Query type.
-     */
-    Type type();
+    /** {@inheritDoc} */
+    @Override public Type type() {
+        return Type.FRAGMENT;
+    }
 
-    /**
-     * Clones this plan.
-     * @param ctx Planner context.
-     */
-    QueryPlan clone(@Nullable PlanningContext ctx);
+    /** {@inheritDoc} */
+    @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
+        return new FragmentPlan(new Cloner(ctx.cluster()).visit(root));
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
index 1dbf3cb..0843557 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -26,31 +26,17 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
  */
-public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
+public class FragmentSplitter extends IgniteRelShuttle {
     /** */
     private final Deque<FragmentProto> stack = new LinkedList<>();
 
@@ -81,90 +67,7 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteFilter rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTrimExchange rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteProject rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTableModify rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteAggregate rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteMapAggregate rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReduceAggregate rel) {
-        assert cutPoint != rel;
-
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteUnionAll rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSort rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSender rel) {
-        assert cutPoint != rel;
-
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteIndexScan rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTableScan rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteValues rel) {
-        assert cutPoint != rel;
-
-        return rel;
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteReceiver rel) {
-        assert cutPoint != rel;
-
         curr.remotes.add(rel);
 
         return rel;
@@ -183,7 +86,7 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
     /**
      * Visits all children of a parent.
      */
-    private IgniteRel processNode(IgniteRel rel) {
+    @Override protected IgniteRel processNode(IgniteRel rel) {
         if (rel == cutPoint) {
             cutPoint = null;
 
@@ -198,16 +101,6 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> {
         return rel;
     }
 
-    /**
-     * Visits a particular child of a parent and replaces the child if it was changed.
-     */
-    private void visitChild(IgniteRel parent, int i, IgniteRel child) {
-        IgniteRel newChild = visit(child);
-
-        if (newChild != child)
-            parent.replaceInput(i, newChild);
-    }
-
     /** */
     private IgniteRel split(IgniteRel rel) {
         RelOptCluster cluster = rel.getCluster();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
similarity index 70%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index 4616756..5f235c2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -17,12 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.LinkedList;
 import java.util.List;
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
@@ -44,30 +39,11 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
-/**
- * Splits a query into a list of query fragments.
- */
-public class Splitter implements IgniteRelVisitor<IgniteRel> {
-    /** */
-    private final Deque<FragmentProto> stack = new LinkedList<>();
-
-    /** */
-    private FragmentProto curr;
-
-    /** */
-    public List<Fragment> go(IgniteRel root) {
-        ArrayList<Fragment> res = new ArrayList<>();
-
-        stack.push(new FragmentProto(Fragment.ID_GEN.getAndIncrement(), root));
-
-        while (!stack.isEmpty()) {
-            curr = stack.pop();
-            curr.root = visit(curr.root);
-            res.add(curr.build());
-            curr = null;
-        }
-
-        return res;
+/** */
+public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteSender rel) {
+        return processNode(rel);
     }
 
     /** {@inheritDoc} */
@@ -86,11 +62,6 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSort rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
         return processNode(rel);
     }
@@ -101,7 +72,7 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTableModify rel) {
+    @Override public IgniteRel visit(IgniteExchange rel) {
         return processNode(rel);
     }
 
@@ -121,33 +92,38 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableModify rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteUnionAll rel) {
         return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSender rel) {
+    @Override public IgniteRel visit(IgniteSort rel) {
         return processNode(rel);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteIndexScan rel) {
-        return rel;
+        return processNode(rel);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteTableScan rel) {
-        return rel;
+        return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteValues rel) {
-        return rel;
+    @Override public IgniteRel visit(IgniteReceiver rel) {
+        return processNode(rel);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReceiver rel) {
-        throw new AssertionError();
+    @Override public IgniteRel visit(IgniteValues rel) {
+        return processNode(rel);
     }
 
     /** {@inheritDoc} */
@@ -155,27 +131,10 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
         return rel.accept(this);
     }
 
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteExchange rel) {
-        RelOptCluster cluster = rel.getCluster();
-
-        long targetFragmentId = curr.id;
-        long sourceFragmentId = Fragment.ID_GEN.getAndIncrement();
-        long exchangeId = sourceFragmentId;
-
-        IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(), exchangeId, sourceFragmentId);
-        IgniteSender sender = new IgniteSender(cluster, rel.getTraitSet(), rel.getInput(), exchangeId, targetFragmentId, rel.distribution());
-
-        curr.remotes.add(receiver);
-        stack.push(new FragmentProto(sourceFragmentId, sender));
-
-        return receiver;
-    }
-
     /**
      * Visits all children of a parent.
      */
-    private IgniteRel processNode(IgniteRel rel) {
+    protected IgniteRel processNode(IgniteRel rel) {
         List<IgniteRel> inputs = Commons.cast(rel.getInputs());
 
         for (int i = 0; i < inputs.size(); i++)
@@ -187,32 +146,10 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
     /**
      * Visits a particular child of a parent and replaces the child if it was changed.
      */
-    private void visitChild(IgniteRel parent, int i, IgniteRel child) {
+    protected void visitChild(IgniteRel parent, int i, IgniteRel child) {
         IgniteRel newChild = visit(child);
 
         if (newChild != child)
             parent.replaceInput(i, newChild);
     }
-
-    /** */
-    private static class FragmentProto {
-        /** */
-        private final long id;
-
-        /** */
-        private IgniteRel root;
-
-        /** */
-        private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
-
-        /** */
-        private FragmentProto(long id, IgniteRel root) {
-            this.id = id;
-            this.root = root;
-        }
-
-        Fragment build() {
-            return new Fragment(id, root, remotes.build());
-        }
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
index 75cf46a..c36e608 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed dml plan.
@@ -38,7 +38,12 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan {
      * @param fieldsMeta Fields metadata.
      */
     public MultiStepDmlPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
-        super(fragments, fieldsMeta);
+        this(fragments, fieldsMeta, new QueryMappings());
+    }
+
+    /** */
+    private MultiStepDmlPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings mappings) {
+        super(fragments, fieldsMeta, mappings);
     }
 
     /** {@inheritDoc} */
@@ -48,7 +53,7 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan {
 
     /** {@inheritDoc}
      * @param ctx*/
-    @Override public QueryPlan clone(PlanningContext ctx) {
-        return new MultiStepDmlPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMetadata());
+    @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
+        return new MultiStepDmlPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMeta, queryMappings);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
index 7b64c9b..5d88964 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed query plan.
@@ -39,7 +39,12 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan {
      * @param fieldsMeta Fields metadata.
      */
     public MultiStepQueryPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
-        super(fragments, fieldsMeta);
+        this(fragments, fieldsMeta, new QueryMappings());
+    }
+
+    /** */
+    private MultiStepQueryPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings mappings) {
+        super(fragments, fieldsMeta, mappings);
     }
 
     /** {@inheritDoc} */
@@ -49,7 +54,7 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan {
 
     /** {@inheritDoc}
      * @param ctx*/
-    @Override public QueryPlan clone(PlanningContext ctx) {
-        return new MultiStepQueryPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMetadata());
+    @Override public QueryPlan clone(@NotNull PlanningContext ctx) {
+        return new MultiStepQueryPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMeta, queryMappings);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java
similarity index 56%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java
index 47aa7c8..7c145c5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -31,99 +32,71 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPl
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- *
- */
-public abstract class AbstractMultiStepPlan implements MultiStepPlan {
-    /** */
-    protected final List<Fragment> fragments;
-
-    /** */
-    protected final List<GridQueryFieldMetadata> fieldsMeta;
 
+/** */
+public class QueryMappings {
     /** */
-    protected Map<Long, NodesMapping> mappings;
+    private static class Mapping {
+        /** */
+        private final AffinityTopologyVersion ver;
 
-    /** */
-    protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
-        this.fragments = fragments;
-        this.fieldsMeta = fieldsMeta;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<Fragment> fragments() {
-        return fragments;
-    }
+        /** */
+        private final Map<Long, NodesMapping> nodeMappings;
 
-    /** {@inheritDoc} */
-    @Override public List<GridQueryFieldMetadata> fieldsMetadata() {
-        return fieldsMeta;
-    }
-
-    /** {@inheritDoc} */
-    @Override public NodesMapping fragmentMapping(Fragment fragment) {
-        return fragmentMapping(fragment.fragmentId());
-    }
-
-    /** {@inheritDoc} */
-    @Override public NodesMapping targetMapping(Fragment fragment) {
-        if (fragment.local())
-            return null;
-
-        return fragmentMapping(((IgniteSender)fragment.root()).targetFragmentId());
+        /** */
+        private Mapping(AffinityTopologyVersion ver, Map<Long, NodesMapping> nodeMappings) {
+            this.ver = ver;
+            this.nodeMappings = nodeMappings;
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public Map<Long, List<UUID>> remoteSources(Fragment fragment) {
-        List<IgniteReceiver> remotes = fragment.remotes();
-
-        if (F.isEmpty(remotes))
-            return null;
+    /** */
+    private final AtomicReference<Mapping> mapping = new AtomicReference<>();
 
-        HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
+    /** */
+    public Map<Long, NodesMapping> map(MappingService mappingService, PlanningContext ctx, List<Fragment> fragments) {
+        Mapping mapping = this.mapping.get();
 
-        for (IgniteReceiver remote : remotes)
-            res.put(remote.exchangeId(), fragmentMapping(remote.sourceFragmentId()).nodes());
+        if (mapping != null && Objects.equals(mapping.ver, ctx.topologyVersion()))
+            return mapping.nodeMappings;
 
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void init(MappingService mappingService, PlanningContext ctx) {
-        mappings = U.newHashMap(fragments.size());
+        ImmutableMap.Builder<Long, NodesMapping> b = ImmutableMap.builder();
 
         RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery();
 
+        boolean save = true;
         for (int i = 0, j = 0; i < fragments.size();) {
             Fragment fragment = fragments.get(i);
 
             try {
-                mappings.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq));
+                b.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq));
 
                 i++;
             }
             catch (OptimisticPlanningException e) {
+                save = false; // we mustn't save mappings for mutated fragments
+
                 if (++j > 3)
                     throw new IgniteSQLException("Failed to map query.", e);
 
-                replace(fragment, new FragmentSplitter(e.node()).go(fragment));
+                replace(fragments, fragment, new FragmentSplitter(e.node()).go(fragment));
 
                 // restart init routine.
-                mappings.clear();
+                b = ImmutableMap.builder();
                 i = 0;
             }
         }
-    }
 
-    /** */
-    private NodesMapping fragmentMapping(long fragmentId) {
-        return mappings == null ? null : mappings.get(fragmentId);
+        ImmutableMap<Long, NodesMapping> mappings = b.build();
+
+        if (save)
+            this.mapping.compareAndSet(mapping, new Mapping(ctx.topologyVersion(), mappings));
+
+        return mappings;
     }
 
     /** */
-    private void replace(Fragment fragment, List<Fragment> replacement) {
+    private void replace(List<Fragment> fragments, Fragment fragment, List<Fragment> replacement) {
         assert !F.isEmpty(replacement);
 
         Map<Long, Long> newTargets = new HashMap<>();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
index 7f38050..1c204f0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
 public interface QueryPlan {
     /** Query type */
-    enum Type { QUERY, DML, DDL, EXPLAIN }
+    enum Type { QUERY, FRAGMENT, DML, DDL, EXPLAIN }
 
     /**
      * @return Query type.
@@ -35,5 +35,5 @@ public interface QueryPlan {
      * Clones this plan.
      * @param ctx Planner context.
      */
-    QueryPlan clone(@Nullable PlanningContext ctx);
+    QueryPlan clone(@NotNull PlanningContext ctx);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index 4616756..464efa1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -23,31 +23,15 @@ import java.util.LinkedList;
 import java.util.List;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Splits a query into a list of query fragments.
  */
-public class Splitter implements IgniteRelVisitor<IgniteRel> {
+public class Splitter extends IgniteRelShuttle {
     /** */
     private final Deque<FragmentProto> stack = new LinkedList<>();
 
@@ -71,91 +55,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteFilter rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTrimExchange rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteProject rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSort rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTableModify rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteAggregate rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteMapAggregate rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteReduceAggregate rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteUnionAll rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteSender rel) {
-        return processNode(rel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteIndexScan rel) {
-        return rel;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteTableScan rel) {
-        return rel;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteValues rel) {
-        return rel;
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteReceiver rel) {
         throw new AssertionError();
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRel visit(IgniteRel rel) {
-        return rel.accept(this);
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteExchange rel) {
         RelOptCluster cluster = rel.getCluster();
 
@@ -172,28 +76,6 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> {
         return receiver;
     }
 
-    /**
-     * Visits all children of a parent.
-     */
-    private IgniteRel processNode(IgniteRel rel) {
-        List<IgniteRel> inputs = Commons.cast(rel.getInputs());
-
-        for (int i = 0; i < inputs.size(); i++)
-            visitChild(rel, i, inputs.get(i));
-
-        return rel;
-    }
-
-    /**
-     * Visits a particular child of a parent and replaces the child if it was changed.
-     */
-    private void visitChild(IgniteRel parent, int i, IgniteRel child) {
-        IgniteRel newChild = visit(child);
-
-        if (newChild != child)
-            parent.replaceInput(i, newChild);
-    }
-
     /** */
     private static class FragmentProto {
         /** */