You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/11/09 08:15:56 UTC

[ignite] branch sql-calcite updated: IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring (#9476)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 9adf8e8  IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring (#9476)
9adf8e8 is described below

commit 9adf8e8e4b1ab5dfa95af42c5bb46d32459a1423
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Tue Nov 9 11:15:19 2021 +0300

    IGNITE-12991 Calcite integration. Introduce running query registry & cancellation refactoring (#9476)
---
 .../query/calcite/CalciteQueryProcessor.java       | 140 ++++-
 .../internal/processors/query/calcite/Query.java   | 188 +++++++
 .../ExecutionService.java => QueryRegistry.java}   |  40 +-
 .../query/calcite/QueryRegistryImpl.java           |  70 +++
 .../query/calcite/RemoteFragmentKey.java           |  66 +++
 .../processors/query/calcite/RootQuery.java        | 348 ++++++++++++
 .../processors/query/calcite/RunningFragment.java  |  79 +++
 .../query/calcite/exec/ExchangeService.java        |   4 +-
 .../query/calcite/exec/ExchangeServiceImpl.java    |  41 +-
 .../query/calcite/exec/ExecutionContext.java       |  29 +-
 .../query/calcite/exec/ExecutionService.java       |  26 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   | 619 ++++-----------------
 .../query/calcite/exec/ddl/DdlCommandHandler.java  |  33 +-
 .../calcite/exec/ddl/NativeCommandHandler.java     |   7 +-
 .../processors/query/calcite/exec/rel/Outbox.java  |   8 +-
 .../query/calcite/message/MessageType.java         |   2 +-
 ...boxCloseMessage.java => QueryCloseMessage.java} |  74 +--
 .../calcite/prepare/AbstractMultiStepPlan.java     |   5 +-
 .../processors/query/calcite/prepare/DdlPlan.java  |   5 +
 .../query/calcite/prepare/MultiStepPlan.java       |   3 +-
 .../calcite/{exec => prepare}/PlannerHelper.java   |   5 +-
 .../query/calcite/prepare/PlanningContext.java     |   8 +
 .../PrepareService.java}                           |  10 +-
 .../query/calcite/prepare/PrepareServiceImpl.java  | 198 +++++++
 .../query/calcite/prepare/QueryTemplate.java       |  19 +-
 .../AggregateExpandDistinctAggregatesRule.java     |   1 +
 .../query/calcite/schema/SchemaHolder.java         |   5 +-
 .../query/calcite/schema/SchemaHolderImpl.java     |  10 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  12 +-
 .../processors/query/calcite/CancelTest.java       |   9 +
 .../processors/query/calcite/QueryChecker.java     |   2 +-
 .../integration/AbstractBasicIntegrationTest.java  |  20 +
 .../integration/AbstractDdlIntegrationTest.java    |  48 +-
 .../integration/IndexDdlIntegrationTest.java       |  38 +-
 .../integration/KillCommandDdlIntegrationTest.java |  25 +-
 .../integration/RunningQueriesIntegrationTest.java | 203 +++++++
 .../calcite/integration/SetOpIntegrationTest.java  |   1 +
 .../integration/TableDdlIntegrationTest.java       | 243 ++++----
 .../integration/UserDdlIntegrationTest.java        |   6 +-
 .../query/calcite/planner/AbstractPlannerTest.java |   3 +-
 .../query/calcite/planner/PlannerTest.java         |  40 +-
 .../internal/processors/query/GridQueryCancel.java |   5 +
 .../internal/processors/query/NoOpQueryEngine.java |  12 +
 .../internal/processors/query/QueryEngine.java     |   9 +
 .../internal/processors/query/QueryState.java}     |  30 +-
 .../internal/processors/query/RunningQuery.java}   |  19 +-
 .../ignite/internal/sql/SqlCommandProcessor.java   |  14 +-
 .../processors/query/h2/CommandProcessor.java      |   2 +-
 48 files changed, 1828 insertions(+), 956 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index d4fd450..28746f2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -17,7 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
+
 import org.apache.calcite.DataContexts;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.config.NullCollation;
@@ -27,6 +32,9 @@ import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.fun.SqlLibrary;
 import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -38,10 +46,12 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.RunningQuery;
 import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
@@ -59,7 +69,10 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityServ
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteConvertletTable;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
@@ -72,6 +85,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrai
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
 import org.jetbrains.annotations.Nullable;
@@ -158,7 +172,13 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
     private final MailboxRegistry mailboxRegistry;
 
     /** */
-    private final ExecutionService executionSvc;
+    private final ExecutionService<Object[]> executionSvc;
+
+    /** */
+    private final PrepareServiceImpl prepareSvc;
+
+    /** */
+    private final QueryRegistry qryReg;
 
     /**
      * @param ctx Kernal context.
@@ -176,6 +196,8 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
         msgSvc = new MessageServiceImpl(ctx);
         mappingSvc = new MappingServiceImpl(ctx);
         exchangeSvc = new ExchangeServiceImpl(ctx);
+        prepareSvc = new PrepareServiceImpl(ctx);
+        qryReg = new QueryRegistryImpl(ctx.log(QueryRegistry.class));
     }
 
     /**
@@ -241,6 +263,11 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
         return failureProcessor;
     }
 
+    /** */
+    public PrepareServiceImpl prepareService() {
+        return prepareSvc;
+    }
+
     /** {@inheritDoc} */
     @Override public void start() {
         onStart(ctx,
@@ -252,13 +279,15 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
             taskExecutor,
             mappingSvc,
             qryPlanCache,
-            exchangeSvc
+            exchangeSvc,
+            qryReg
         );
     }
 
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) {
         onStop(
+            qryReg,
             executionSvc,
             mailboxRegistry,
             partSvc,
@@ -272,10 +301,96 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
     }
 
     /** {@inheritDoc} */
-    @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
-        String qry, Object... params) throws IgniteSQLException {
+    @Override public List<FieldsQueryCursor<List<?>>> query(
+        @Nullable QueryContext qryCtx,
+        @Nullable String schemaName,
+        String sql,
+        Object... params
+    ) throws IgniteSQLException {
+        SchemaPlus schema = schemaHolder.schema(schemaName);
+
+        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql));
+
+        if (plan != null) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sql,
+                schema,
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qryReg.register(qry);
+
+            try {
+                return Collections.singletonList(executionSvc.executePlan(
+                    qry,
+                    plan
+                ));
+            }
+            catch (Exception e) {
+                boolean isCanceled = qry.isCancelled();
+
+                qry.cancel();
+
+                qryReg.unregister(qry.id());
+
+                if (isCanceled)
+                    throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+                else
+                    throw e;
+
+            }
+        }
+
+        SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());
+        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
 
-        return executionSvc.executeQuery(qryCtx, schemaName, qry, params);
+        List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
+
+        for (final SqlNode sqlNode: qryList) {
+            RootQuery<Object[]> qry = new RootQuery<>(
+                sqlNode.toString(),
+                schemaHolder.schema(schemaName), // Update schema for each query in multiple statements.
+                params,
+                qryCtx,
+                exchangeSvc,
+                (q) -> qryReg.unregister(q.id()),
+                log
+            );
+
+            qrys.add(qry);
+
+            qryReg.register(qry);
+
+            try {
+                if (qryList.size() == 1) {
+                    plan = queryPlanCache().queryPlan(
+                        new CacheKey(schemaName, qry.sql()),
+                        () -> prepareSvc.prepareSingle(sqlNode, qry.planningContext()));
+                }
+                else
+                    plan = prepareSvc.prepareSingle(sqlNode, qry.planningContext());
+
+                cursors.add(executionSvc.executePlan(qry, plan));
+            }
+            catch (Exception e) {
+                boolean isCanceled = qry.isCancelled();
+
+                qrys.forEach(RootQuery::cancel);
+
+                qryReg.unregister(qry.id());
+
+                if (isCanceled)
+                    throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+                else
+                    throw e;
+            }
+        }
+
+        return cursors;
     }
 
     /** */
@@ -293,4 +408,19 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
                 ((LifecycleAware) service).onStop();
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public RunningQuery runningQuery(UUID id) {
+        return qryReg.query(id);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends RunningQuery> runningQueries() {
+        return qryReg.runningQueries();
+    }
+
+    /** */
+    public QueryRegistry queryRegistry() {
+        return qryReg;
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
new file mode 100644
index 0000000..1c48bd1
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class Query<RowT> implements RunningQuery {
+    /** Completable futures empty array. */
+    private static final CompletableFuture<?>[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
+
+    /** */
+    private final UUID initNodeId;
+
+    /** */
+    private final UUID id;
+
+    /** */
+    protected final Object mux = new Object();
+
+    /** */
+    protected final Set<RunningFragment<RowT>> fragments;
+
+    /** */
+    protected final GridQueryCancel cancel;
+
+    /** */
+    protected final Consumer<Query<RowT>> unregister;
+
+    /** */
+    protected volatile QueryState state = QueryState.INITED;
+
+    /** */
+    protected final ExchangeService exch;
+
+    /** Logger. */
+    protected final IgniteLogger log;
+
+    /** */
+    public Query(
+        UUID id,
+        UUID initNodeId,
+        GridQueryCancel cancel,
+        ExchangeService exch,
+        Consumer<Query<RowT>> unregister,
+        IgniteLogger log
+    ) {
+        this.id = id;
+        this.unregister = unregister;
+        this.initNodeId = initNodeId;
+        this.exch = exch;
+        this.log = log;
+
+        this.cancel = cancel != null ? cancel : new GridQueryCancel();
+
+        fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    }
+
+    /** */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** */
+    @Override public QueryState state() {
+        return state;
+    }
+
+    /** */
+    public UUID initiatorNodeId() {
+        return initNodeId;
+    }
+
+    /** */
+    protected void tryClose() {
+        List<RunningFragment<RowT>> fragments = new ArrayList<>(this.fragments);
+
+        AtomicInteger cntDown = new AtomicInteger(fragments.size());
+
+        for (RunningFragment<RowT> frag : fragments) {
+            frag.context().execute(() -> {
+                frag.root().close();
+                frag.context().cancel();
+
+                if (cntDown.decrementAndGet() == 0)
+                    unregister.accept(this);
+
+            }, frag.root()::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED)
+                return;
+            
+            if (state == QueryState.INITED) {
+                state = QueryState.CLOSING;
+
+                try {
+                    exch.closeQuery(initNodeId, id);
+
+                    return;
+                }
+                catch (IgniteCheckedException e) {
+                    log.warning("Cannot send cancel request to query initiator", e);
+                }
+            }
+
+            if (state == QueryState.EXECUTING || state == QueryState.CLOSING)
+                state = QueryState.CLOSED;
+        }
+
+        for (RunningFragment<RowT> frag : fragments)
+            frag.context().execute(() -> frag.root().onError(new ExecutionCancelledException()), frag.root()::onError);
+
+        tryClose();
+    }
+
+    /** */
+    public void addFragment(RunningFragment<RowT> f) {
+        synchronized (mux) {
+            if (state == QueryState.INITED)
+                state = QueryState.EXECUTING;
+
+            if (state == QueryState.CLOSING || state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled",
+                    IgniteQueryErrorCode.QUERY_CANCELED,
+                    new ExecutionCancelledException()
+                );
+            }
+
+            fragments.add(f);
+        }
+    }
+
+    /** */
+    public boolean isCancelled() {
+        return cancel.isCanceled();
+    }
+
+    /** */
+    public void onNodeLeft(UUID nodeId) {
+        if (initNodeId.equals(nodeId))
+            cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(Query.class, this, "state", state, "fragments", fragments);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
similarity index 53%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
index 64e2f71..70573a8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
@@ -15,35 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.exec;
+package org.apache.ignite.internal.processors.query.calcite;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.UUID;
 
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.RunningQuery;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
-import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * Registry of the running queries.
  */
-public interface ExecutionService extends Service {
+public interface QueryRegistry extends Service {
+    /**
+     * Register the query or return the existing one with the same identifier.
+     *
+     * @param qry Query to register.
+     * @return Registered query.
+     */
+    RunningQuery register(RunningQuery qry);
+
     /**
-     * Executes a query.
+     * Lookup query by identifier.
      *
-     * @param ctx Query external context, contains flags and connection settings like a locale or a timezone.
-     * @param schema Schema name.
-     * @param query Query.
-     * @param params Query parameters.
-     * @return Query cursor.
+     * @param id Query identified.
+     * @return Registered query or {@code null} if the query with specified identifier isn't found.
      */
-    List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
+    RunningQuery query(UUID id);
 
     /**
-     * Cancels a running query.
+     * Unregister query by identifier.
      *
-     * @param queryId Query ID.
+     * @param id Query identifier.
      */
-    void cancelQuery(UUID queryId);
+    void unregister(UUID id);
+
+    /** */
+    Collection<? extends RunningQuery> runningQueries();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
new file mode 100644
index 0000000..429d05f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Registry of the running queries.
+ */
+public class QueryRegistryImpl implements QueryRegistry {
+    /** */
+    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    public QueryRegistryImpl(IgniteLogger log) {
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RunningQuery register(RunningQuery qry) {
+        return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RunningQuery query(UUID id) {
+        return runningQrys.get(id);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregister(UUID id) {
+        runningQrys.remove(id);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends RunningQuery> runningQueries() {
+        return runningQrys.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void tearDown() {
+        runningQrys.values().forEach(q -> IgniteUtils.close(q::cancel, log));
+
+        runningQrys.clear();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java
new file mode 100644
index 0000000..9b1eac7
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RemoteFragmentKey.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.UUID;
+
+/** */
+final class RemoteFragmentKey {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final long fragmentId;
+
+    /** */
+    RemoteFragmentKey(UUID nodeId, long fragmentId) {
+        this.nodeId = nodeId;
+        this.fragmentId = fragmentId;
+    }
+
+    /** */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** */
+    public long fragmentId() {
+        return fragmentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        RemoteFragmentKey that = (RemoteFragmentKey) o;
+
+        if (fragmentId != that.fragmentId)
+            return false;
+        return nodeId.equals(that.nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = nodeId.hashCode();
+        res = 31 * res + (int) (fragmentId ^ (fragmentId >>> 32));
+        return res;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
new file mode 100644
index 0000000..449b050
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.CancelFlag;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * The RootQuery is created on the query initiator (originator) node as the first step of a query run;
+ * It contains the information about query state, contexts, remote fragments;
+ * It provides 'cancel' functionality for running query like a base query class.
+ */
+public class RootQuery<RowT> extends Query<RowT> {
+    /** SQL query. */
+    private final String sql;
+
+    /** Parameters. */
+    private final Object[] params;
+
+    /** remote nodes */
+    private final Set<UUID> remotes;
+
+    /** node to fragment */
+    private final Set<RemoteFragmentKey> waiting;
+
+    /** */
+    private volatile RootNode<RowT> root;
+
+    /** */
+    private volatile PlanningContext pctx;
+
+    /** */
+    private final BaseQueryContext ctx;
+
+    /** */
+    public RootQuery(
+        String sql,
+        SchemaPlus schema,
+        Object[] params,
+        QueryContext qryCtx,
+        ExchangeService exch,
+        Consumer<Query<RowT>> unregister,
+        IgniteLogger log
+    ) {
+        super(
+            UUID.randomUUID(),
+            null,
+            qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null,
+            exch,
+            unregister,
+            log
+        );
+
+        this.sql = sql;
+        this.params = params;
+
+        remotes = new HashSet<>();
+        waiting = new HashSet<>();
+
+        Context parent = Commons.convert(qryCtx);
+
+        ctx = BaseQueryContext.builder()
+            .parentContext(parent)
+            .frameworkConfig(
+                Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                    .defaultSchema(schema)
+                    .build()
+            )
+            .logger(log)
+            .build();
+    }
+
+    /**
+     * Creates the new root that inherits the query parameters from {@code this} query.
+     * Is used to execute DML query immediately after (inside) DDL.
+     * e.g.:
+     *      CREATE TABLE MY_TABLE AS SELECT ... FROM ...;
+     *
+     * @param schema new schema.
+     */
+    public RootQuery<RowT> childQuery(SchemaPlus schema) {
+        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log);
+    }
+
+    /** */
+    public BaseQueryContext context() {
+        return ctx;
+    }
+
+    /** */
+    public String sql() {
+        return sql;
+    }
+
+    /** */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /**
+     * Starts maping phase for the query.
+     */
+    public void mapping() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            state = QueryState.MAPPING;
+        }
+    }
+
+    /**
+     * Starts execution phase for the query and setup remote fragments.
+     */
+    public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            RootNode<RowT> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
+            rootNode.register(root);
+
+            addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
+
+            this.root = rootNode;
+
+            for (int i = 1; i < plan.fragments().size(); i++) {
+                Fragment fragment = plan.fragments().get(i);
+                List<UUID> nodes = plan.mapping(fragment).nodeIds();
+
+                remotes.addAll(nodes);
+
+                for (UUID node : nodes)
+                    waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
+            }
+
+            state = QueryState.EXECUTING;
+        }
+    }
+
+    /**
+     * Can be called multiple times after receive each error
+     * at {@link #onResponse(RemoteFragmentKey, Throwable)}.
+     */
+    @Override protected void tryClose() {
+        QueryState state0 = null;
+
+        synchronized (mux) {
+            if (state == QueryState.CLOSED)
+                return;
+
+            if (state == QueryState.INITED || state == QueryState.PLANNING || state == QueryState.MAPPING) {
+                state = QueryState.CLOSED;
+
+                return;
+            }
+
+            if (state == QueryState.EXECUTING) {
+                state0 = state = QueryState.CLOSING;
+
+                root.closeInternal();
+            }
+
+            if (state == QueryState.CLOSING && waiting.isEmpty())
+                state0 = state = QueryState.CLOSED;
+        }
+
+        if (state0 == QueryState.CLOSED) {
+            try {
+                IgniteException wrpEx = null;
+
+                for (UUID nodeId : remotes) {
+                    try {
+                        if (!nodeId.equals(root.context().localNodeId()))
+                            exch.closeQuery(nodeId, id());
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (wrpEx == null)
+                            wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
+                        else
+                            wrpEx.addSuppressed(e);
+                    }
+                }
+
+                if (wrpEx != null)
+                    log.warning("An exception occures during the query cancel", wrpEx);
+            }
+            finally {
+                super.tryClose();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancel.cancel();
+
+        tryClose();
+    }
+
+    /** */
+    public PlanningContext planningContext() {
+        synchronized (mux) {
+            if (state == QueryState.CLOSED || state == QueryState.CLOSING) {
+                throw new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED
+                );
+            }
+
+            if (state == QueryState.EXECUTING || state == QueryState.MAPPING) {
+                throw new IgniteSQLException(
+                    "Invalid query flow",
+                    IgniteQueryErrorCode.UNKNOWN
+                );
+            }
+
+            if (pctx == null) {
+                state = QueryState.PLANNING;
+
+                pctx = PlanningContext.builder()
+                    .parentContext(ctx)
+                    .query(sql)
+                    .parameters(params)
+                    .build();
+
+                try {
+                    cancel.add(() -> pctx.unwrap(CancelFlag.class).requestCancel());
+                }
+                catch (QueryCancelledException e) {
+                    throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.QUERY_CANCELED, e);
+                }
+            }
+
+            return pctx;
+        }
+    }
+
+    /** */
+    public Iterator<RowT> iterator() {
+        return root;
+    }
+
+    /** */
+    @Override public void onNodeLeft(UUID nodeId) {
+        List<RemoteFragmentKey> fragments = null;
+
+        synchronized (mux) {
+            fragments = waiting.stream().filter(f -> f.nodeId().equals(nodeId)).collect(Collectors.toList());
+        }
+
+        if (!F.isEmpty(fragments)) {
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException(
+                "Failed to start query, node left. nodeId=" + nodeId);
+
+            for (RemoteFragmentKey fragment : fragments)
+                onResponse(fragment, ex);
+        }
+    }
+
+    /** */
+    public void onResponse(UUID nodeId, long fragmentId, Throwable error) {
+        onResponse(new RemoteFragmentKey(nodeId, fragmentId), error);
+    }
+
+    /** */
+    private void onResponse(RemoteFragmentKey fragment, Throwable error) {
+        QueryState state;
+        synchronized (mux) {
+            waiting.remove(fragment);
+
+            state = this.state;
+        }
+
+        if (error != null)
+            onError(error);
+        else if (state == QueryState.CLOSING)
+            tryClose();
+    }
+
+    /** */
+    public void onError(Throwable error) {
+        root.onError(error);
+
+        tryClose();
+    }
+
+    /** */
+    @Override public String toString() {
+        return S.toString(RootQuery.class, this);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
new file mode 100644
index 0000000..2627e022
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RunningFragment.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Objects;
+
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+public class RunningFragment<Row> {
+    /** Relation tree of the fragment is used to generate fragment human-readable description. */
+    private final IgniteRel rootRel;
+
+    /** */
+    private final AbstractNode<Row> root;
+
+    /** */
+    private final ExecutionContext<Row> ectx;
+
+    /** */
+    public RunningFragment(
+        IgniteRel rootRel,
+        AbstractNode<Row> root,
+        ExecutionContext<Row> ectx) {
+        this.rootRel = rootRel;
+        this.root = root;
+        this.ectx = ectx;
+    }
+
+    /** */
+    public ExecutionContext<Row> context() {
+        return ectx;
+    }
+
+    /** */
+    public AbstractNode<Row> root() {
+        return root;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        RunningFragment<Row> fragment = (RunningFragment<Row>)o;
+
+        return Objects.equals(ectx, fragment.ectx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(ectx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(RunningFragment.class, this);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index ac1f58f..ffbf5bc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -63,10 +63,8 @@ public interface ExchangeService extends Service {
      * Sends cancel request.
      * @param nodeId Target node ID.
      * @param qryId Query ID.
-     * @param fragmentId Target fragment ID.
-     * @param exchangeId Exchange ID.
      */
-    void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException;
+    void closeQuery(UUID nodeId, UUID qryId) throws IgniteCheckedException;
 
     /**
      * @param nodeId Target node ID.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 6c1fca4..bd942552 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,20 +21,23 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.RunningQuery;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
-import org.apache.ignite.internal.processors.query.calcite.message.OutboxCloseMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
+import org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -57,6 +60,9 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
     /** */
     private MessageService msgSvc;
 
+    /** */
+    private QueryRegistry qryRegistry;
+
     /**
      * @param ctx Kernal context.
      */
@@ -108,6 +114,11 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
         return msgSvc;
     }
 
+    /** */
+    public void queryRegistry(QueryRegistry qryRegistry) {
+        this.qryRegistry = qryRegistry;
+    }
+
     /** {@inheritDoc} */
     @Override public <Row> void sendBatch(UUID nodeId, UUID qryId, long fragmentId, long exchangeId, int batchId,
         boolean last, List<Row> rows) throws IgniteCheckedException {
@@ -121,8 +132,8 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
     }
 
     /** {@inheritDoc} */
-    @Override public void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException {
-        messageService().send(nodeId, new OutboxCloseMessage(qryId, fragmentId, exchangeId));
+    @Override public void closeQuery(UUID nodeId, UUID qryId) throws IgniteCheckedException {
+        messageService().send(nodeId, new QueryCloseMessage(qryId));
     }
 
     /** {@inheritDoc} */
@@ -143,6 +154,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
         taskExecutor(proc.taskExecutor());
         mailboxRegistry(proc.mailboxRegistry());
         messageService(proc.messageService());
+        queryRegistry(proc.queryRegistry());
 
         init();
     }
@@ -150,9 +162,9 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
     /** {@inheritDoc} */
     @Override public void init() {
         messageService().register((n, m) -> onMessage(n, (InboxCloseMessage) m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
-        messageService().register((n, m) -> onMessage(n, (OutboxCloseMessage) m), MessageType.QUERY_OUTBOX_CANCEL_MESSAGE);
         messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage) m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
         messageService().register((n, m) -> onMessage(n, (QueryBatchMessage) m), MessageType.QUERY_BATCH_MESSAGE);
+        messageService().register((n, m) -> onMessage(n, (QueryCloseMessage) m), MessageType.QUERY_CLOSE_MESSAGE);
     }
 
     /** {@inheritDoc} */
@@ -178,22 +190,15 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
     }
 
     /** */
-    protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
-        Collection<Outbox<?>> outboxes = mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+    protected void onMessage(UUID nodeId, QueryCloseMessage msg) {
+        RunningQuery qry = qryRegistry.query(msg.queryId());
 
-        if (!F.isEmpty(outboxes)) {
-            for (Outbox<?> outbox : outboxes)
-                outbox.context().execute(outbox::close, outbox::onError);
-
-            for (Outbox<?> outbox : outboxes)
-                outbox.context().execute(outbox.context()::cancel, outbox::onError);
-        }
-        else if (log.isDebugEnabled()) {
-            log.debug("Stale oubox cancel message received: [" +
+        if (qry != null)
+            qry.cancel();
+        else {
+            log.warning("Stale query close message received: [" +
                 "nodeId=" + nodeId +
-                ", queryId=" + msg.queryId() +
-                ", fragmentId=" + msg.fragmentId() +
-                ", exchangeId=" + msg.exchangeId() + "]");
+                ", queryId=" + msg.queryId() + "]");
         }
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 45f6cea..2a94b21 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -34,7 +35,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -236,13 +236,6 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
         return unwrap(BaseQueryContext.class).typeFactory();
     }
 
-    /**
-     * @return Query cancel.
-     */
-    public GridQueryCancel queryCancel() {
-        return unwrap(BaseQueryContext.class).queryCancel();
-    }
-
     /** {@inheritDoc} */
     @Override public QueryProvider getQueryProvider() {
         return null; // TODO
@@ -301,7 +294,8 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
 
         executor.execute(qryId, fragmentId(), () -> {
             try {
-                task.run();
+                if (!isCancelled())
+                    task.run();
             }
             catch (Throwable e) {
                 onError.accept(e);
@@ -354,4 +348,21 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
     public boolean isCancelled() {
         return cancelFlag.get();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ExecutionContext<?> context = (ExecutionContext<?>)o;
+
+        return qryId.equals(context.qryId) && fragmentDesc.fragmentId() == context.fragmentDesc.fragmentId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(qryId, fragmentDesc.fragmentId());
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index 64e2f71..a3fbf2c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -18,32 +18,16 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.RootQuery;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
-import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public interface ExecutionService extends Service {
-    /**
-     * Executes a query.
-     *
-     * @param ctx Query external context, contains flags and connection settings like a locale or a timezone.
-     * @param schema Schema name.
-     * @param query Query.
-     * @param params Query parameters.
-     * @return Query cursor.
-     */
-    List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
-
-    /**
-     * Cancels a running query.
-     *
-     * @param queryId Query ID.
-     */
-    void cancelQuery(UUID queryId);
+public interface ExecutionService<Row> extends Service {
+    /** */
+    FieldsQueryCursor<List<?>> executePlan(RootQuery<Row> qry, QueryPlan plan);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 46e7678..41ef0b1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -17,39 +17,20 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.runtime.CalciteContextException;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlDdl;
-import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.ValidationException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -57,16 +38,18 @@ import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeMan
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryCancellable;
-import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.Query;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
+import org.apache.ignite.internal.processors.query.calcite.RootQuery;
+import org.apache.ignite.internal.processors.query.calcite.RunningFragment;
 import org.apache.ignite.internal.processors.query.calcite.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
 import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -85,44 +68,33 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadat
 import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
-import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
-import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate;
-import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
-import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.CreateTableCommand;
-import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
-import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
 import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
-import static org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper.optimize;
 import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
 
 /**
  *
  */
 @SuppressWarnings("TypeMayBeWeakened")
-public class ExecutionServiceImpl<Row> extends AbstractService implements ExecutionService {
+public class ExecutionServiceImpl<Row> extends AbstractService implements ExecutionService<Row> {
     /** */
     private final DiscoveryEventListener discoLsnr;
 
@@ -163,10 +135,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     private ExchangeService exchangeSvc;
 
     /** */
+    private PrepareServiceImpl prepareSvc;
+
+    /** */
     private ClosableIteratorsHolder iteratorsHolder;
 
     /** */
-    private final Map<UUID, QueryInfo> running;
+    private QueryRegistry qryReg;
 
     /** */
     private final RowHandler<Row> handler;
@@ -174,9 +149,6 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     /** */
     private final DdlCommandHandler ddlCmdHnd;
 
-    /** */
-    private final DdlSqlToCommandConverter ddlConverter;
-
     /**
      * @param ctx Kernal.
      */
@@ -185,11 +157,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         this.handler = handler;
 
         discoLsnr = (e, c) -> onNodeLeft(e.eventNode().id());
-        running = new ConcurrentHashMap<>();
-        ddlConverter = new DdlSqlToCommandConverter();
 
         ddlCmdHnd = new DdlCommandHandler(
-            ctx::query, ctx.cache(), ctx.security(), () -> schemaHolder().schema()
+            ctx::query, ctx.cache(), ctx.security(), () -> schemaHolder().schema(null)
         );
     }
 
@@ -327,6 +297,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /**
+     * @param prepareSvc Prepare service.
+     */
+    public void prepareService(PrepareServiceImpl prepareSvc) {
+        this.prepareSvc = prepareSvc;
+    }
+
+    /**
      * @return Exchange service.
      */
     public ExchangeService exchangeService() {
@@ -375,46 +352,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         return iteratorsHolder;
     }
 
-    /** {@inheritDoc} */
-    @Override public List<FieldsQueryCursor<List<?>>> executeQuery(
-        @Nullable QueryContext ctx,
-        String schema,
-        String qry,
-        Object[] params
-    ) {
-        QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(getDefaultSchema(schema).getName(), qry));
-        if (plan != null) {
-            PlanningContext pctx = createContext(ctx, schema, qry, params);
-
-            return Collections.singletonList(executePlan(UUID.randomUUID(), pctx, plan));
-        }
-
-        SqlNodeList qryList = Commons.parse(qry, FRAMEWORK_CONFIG.getParserConfig());
-        List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(qryList.size());
-
-        for (final SqlNode qry0: qryList) {
-            final PlanningContext pctx = createContext(ctx, schema, qry0.toString(), params);
-
-            if (qryList.size() == 1) {
-                plan = queryPlanCache().queryPlan(
-                    new CacheKey(pctx.schemaName(), pctx.query()),
-                    () -> prepareSingle(qry0, pctx));
-            }
-            else
-                plan = prepareSingle(qry0, pctx);
-
-            cursors.add(executePlan(UUID.randomUUID(), pctx, plan));
-        }
-
-        return cursors;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancelQuery(UUID qryId) {
-        QueryInfo info = running.get(qryId);
-
-        if (info != null)
-            info.doCancel();
+    /** */
+    public void queryRegistry(QueryRegistry qryReg) {
+        this.qryReg = qryReg;
     }
 
     /** {@inheritDoc} */
@@ -436,6 +376,8 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         mappingService(proc.mappingService());
         messageService(proc.messageService());
         exchangeService(proc.exchangeService());
+        queryRegistry(proc.queryRegistry());
+        prepareService(proc.prepareService());
 
         init();
      }
@@ -455,8 +397,6 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     @Override public void tearDown() {
         eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
-        running.clear();
-
         iteratorsHolder().tearDown();
     }
 
@@ -466,17 +406,12 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private PlanningContext createContext(QueryContext ctx, @Nullable String schema, String qry, Object[] params) {
-        return createContext(Commons.convert(ctx), schema, qry, params);
-    }
-
-    /** */
     private BaseQueryContext createQueryContext(Context parent, @Nullable String schema) {
         return BaseQueryContext.builder()
             .parentContext(parent)
             .frameworkConfig(
                 Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                    .defaultSchema(getDefaultSchema(schema))
+                    .defaultSchema(schemaHolder().schema(schema))
                     .build()
             )
             .logger(log)
@@ -484,148 +419,20 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private PlanningContext createContext(Context parent, @Nullable String schema, String qry, Object[] params) {
-        return PlanningContext.builder()
-            .parentContext(createQueryContext(parent, schema))
-            .query(qry)
-            .parameters(params)
-            .build();
-    }
-
-    /** */
-    private SchemaPlus getDefaultSchema(String schema) {
-        return schema != null ? schemaHolder().schema().getSubSchema(schema) : schemaHolder().schema();
-    }
-
-    /** */
     private QueryPlan prepareFragment(BaseQueryContext ctx, String jsonFragment) {
         return new FragmentPlan(fromJson(ctx, jsonFragment));
     }
 
-    /** */
-    private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
-        try {
-            assert single(sqlNode);
-
-            ctx.planner().reset();
-
-            if (SqlKind.DDL.contains(sqlNode.getKind()))
-                return prepareDdl(sqlNode, ctx);
-
-            switch (sqlNode.getKind()) {
-                case SELECT:
-                case ORDER_BY:
-                case WITH:
-                case VALUES:
-                case UNION:
-                case EXCEPT:
-                case INTERSECT:
-                    return prepareQuery(sqlNode, ctx);
-
-                case INSERT:
-                case DELETE:
-                case UPDATE:
-                    return prepareDml(sqlNode, ctx);
-
-                case EXPLAIN:
-                    return prepareExplain(sqlNode, ctx);
-
-                default:
-                    throw new IgniteSQLException("Unsupported operation [" +
-                        "sqlNodeKind=" + sqlNode.getKind() + "; " +
-                        "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-            }
-        }
-        catch (ValidationException | CalciteContextException e) {
-            throw new IgniteSQLException("Failed to validate query: " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
-        }
-    }
-
-    /** */
-    private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
-        IgnitePlanner planner = ctx.planner();
-
-        // Validate
-        ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
-
-        sqlNode = validated.sqlNode();
-
-        IgniteRel igniteRel = optimize(sqlNode, planner, log);
-
-        return new MultiStepQueryPlan(queryTemplate(igniteRel),
-            queryFieldsMetadata(ctx, validated.dataType(), validated.origins()));
-    }
-
-    /** */
-    private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
-        IgnitePlanner planner = ctx.planner();
-
-        // Validate
-        sqlNode = planner.validate(sqlNode);
-
-        // Convert to Relational operators graph
-        IgniteRel igniteRel = optimize(sqlNode, planner, log);
-
-        return new MultiStepDmlPlan(queryTemplate(igniteRel),
-            queryFieldsMetadata(ctx, igniteRel.getRowType(), null));
-    }
-
-    /** */
-    private QueryPlan prepareDdl(SqlNode sqlNode, PlanningContext ctx) {
-        assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : sqlNode.getClass().getName();
-
-        return new DdlPlan(ddlConverter.convert((SqlDdl)sqlNode, ctx));
-    }
-
-    /** */
-    private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
-        IgnitePlanner planner = ctx.planner();
-
-        SqlNode sql = ((SqlExplain)explain).getExplicandum();
-
-        // Validate
-        sql = planner.validate(sql);
-
-        // Convert to Relational operators graph
-        IgniteRel igniteRel = optimize(sql, planner, log);
-
-        String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
-
-        return new ExplainPlan(plan, explainFieldsMetadata(ctx));
-    }
-
-    /** */
-    private QueryTemplate queryTemplate(IgniteRel rel) {
-        // Split query plan to query fragments.
-        List<Fragment> fragments = new Splitter().go(rel);
-
-        return new QueryTemplate(mappingSvc, fragments);
-    }
-
-    /** */
-    private FieldsMetadata explainFieldsMetadata(PlanningContext ctx) {
-        IgniteTypeFactory factory = ctx.typeFactory();
-        RelDataType planStrDataType =
-            factory.createSqlType(SqlTypeName.VARCHAR, PRECISION_NOT_SPECIFIED);
-        T2<String, RelDataType> planField = new T2<>(ExplainPlan.PLAN_COL_NAME, planStrDataType);
-        RelDataType planDataType = factory.createStructType(singletonList(planField));
-
-        return queryFieldsMetadata(ctx, planDataType, null);
-    }
-
-    /** */
-    private FieldsQueryCursor<List<?>> executePlan(
-        UUID qryId,
-        PlanningContext pctx,
+    /** {@inheritDoc} */
+    @Override public FieldsQueryCursor<List<?>> executePlan(
+        RootQuery<Row> qry,
         QueryPlan plan
     ) {
         switch (plan.type()) {
             case DML:
                 ListFieldsQueryCursor<?> cur = mapAndExecutePlan(
-                    qryId,
-                    (MultiStepPlan)plan,
-                    pctx.unwrap(BaseQueryContext.class),
-                    pctx.parameters()
+                    qry,
+                    (MultiStepPlan)plan
                 );
 
                 cur.iterator().hasNext();
@@ -634,17 +441,15 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
 
             case QUERY:
                 return mapAndExecutePlan(
-                    qryId,
-                    (MultiStepPlan)plan,
-                    pctx.unwrap(BaseQueryContext.class),
-                    pctx.parameters()
+                    qry,
+                    (MultiStepPlan)plan
                 );
 
             case EXPLAIN:
-                return executeExplain((ExplainPlan)plan, pctx);
+                return executeExplain((ExplainPlan)plan);
 
             case DDL:
-                return executeDdl(qryId, (DdlPlan)plan, pctx);
+                return executeDdl(qry, (DdlPlan)plan);
 
             default:
                 throw new AssertionError("Unexpected plan type: " + plan);
@@ -652,29 +457,29 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private FieldsQueryCursor<List<?>> executeDdl(UUID qryId, DdlPlan plan, PlanningContext pctx) {
+    private FieldsQueryCursor<List<?>> executeDdl(RootQuery<Row> qry, DdlPlan plan) {
         try {
-            ddlCmdHnd.handle(qryId, plan.command(), pctx);
+            ddlCmdHnd.handle(qry.id(), plan.command());
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + pctx.query() +
+            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.sql() +
                 ", err=" + e.getMessage() + ']', e);
         }
+        finally {
+            qryReg.unregister(qry.id());
+        }
 
-        if (plan.command() instanceof CreateTableCommand && ((CreateTableCommand)plan.command()).insertStatement() != null) {
-            SqlInsert insertStmt = ((CreateTableCommand)plan.command()).insertStatement();
+        if (plan.command() instanceof CreateTableCommand
+            && ((CreateTableCommand)plan.command()).insertStatement() != null) {
+            RootQuery<Row> insQry = qry.childQuery(schemaHolder.schema(qry.context().schemaName()));
 
-            try {
-                // Create new planning context containing created table in the schema.
-                PlanningContext dmlCtx = createContext(pctx, pctx.schemaName(), pctx.query(), pctx.parameters());
+            qryReg.register(insQry);
 
-                QueryPlan dmlPlan = prepareDml(insertStmt, dmlCtx);
+            SqlInsert insertStmt = ((CreateTableCommand)plan.command()).insertStatement();
 
-                return executePlan(qryId, dmlCtx, dmlPlan);
-            }
-            catch (ValidationException e) {
-                throw new IgniteSQLException("Failed to validate query.", IgniteQueryErrorCode.PARSING, e);
-            }
+            QueryPlan dmlPlan = prepareSvc.prepareSingle(insertStmt, insQry.planningContext());
+
+            return executePlan(insQry, dmlPlan);
         }
         else
             return H2Utils.zeroCursor();
@@ -682,13 +487,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
 
     /** */
     private ListFieldsQueryCursor<?> mapAndExecutePlan(
-        UUID qryId,
-        MultiStepPlan plan,
-        BaseQueryContext qctx,
-        Object[] params
+        RootQuery<Row> qry,
+        MultiStepPlan plan
     ) {
+        qry.mapping();
+
         MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion());
-        plan.init(mapCtx);
+        plan.init(mappingSvc, mapCtx);
 
         List<Fragment> fragments = plan.fragments();
 
@@ -714,23 +519,20 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
             plan.remotes(fragment));
 
         ExecutionContext<Row> ectx = new ExecutionContext<>(
-            qctx,
+            qry.context(),
             taskExecutor(),
-            qryId,
+            qry.id(),
             locNodeId,
             locNodeId,
             mapCtx.topologyVersion(),
             fragmentDesc,
             handler,
-            Commons.parametersMap(params));
+            Commons.parametersMap(qry.parameters()));
 
         Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
             exchangeService(), failureProcessor()).go(fragment.root());
 
-        QueryInfo info = new QueryInfo(ectx, plan, node);
-
-        // register query
-        register(info);
+        qry.run(ectx, plan, node);
 
         // start remote execution
         for (int i = 1; i < fragments.size(); i++) {
@@ -744,51 +546,54 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
             Throwable ex = null;
             for (UUID nodeId : fragmentDesc.nodeIds()) {
                 if (ex != null)
-                    info.onResponse(nodeId, fragment.fragmentId(), ex);
+                    qry.onResponse(nodeId, fragment.fragmentId(), ex);
                 else {
                     try {
                         QueryStartRequest req = new QueryStartRequest(
-                            qryId,
-                            qctx.schemaName(),
+                            qry.id(),
+                            qry.context().schemaName(),
                             fragment.serialized(),
                             ectx.topologyVersion(),
                             fragmentDesc,
-                            params);
+                            qry.parameters());
 
                         messageService().send(nodeId, req);
                     }
                     catch (Throwable e) {
-                        info.onResponse(nodeId, fragment.fragmentId(), ex = e);
+                        qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
                     }
                 }
             }
         }
 
-        return new ListFieldsQueryCursor<>(plan, info.iterator(), ectx);
+        return new ListFieldsQueryCursor<>(plan, iteratorsHolder().iterator(qry.iterator()), ectx);
     }
 
     /** */
-    private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan, PlanningContext pctx) {
+    private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan) {
         QueryCursorImpl<List<?>> cur = new QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
-        cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(pctx.typeFactory()));
+        cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
 
         return cur;
     }
 
     /** */
-    private void executeFragment(UUID qryId, FragmentPlan plan, ExecutionContext<Row> ectx) {
+    private void executeFragment(Query<Row> qry, FragmentPlan plan, ExecutionContext<Row> ectx) {
         UUID origNodeId = ectx.originatingNodeId();
 
         Outbox<Row> node = new LogicalRelImplementor<>(
-                ectx,
-                partitionService(),
-                mailboxRegistry(),
-                exchangeService(),
-                failureProcessor())
-                .go(plan.root());
+            ectx,
+            partitionService(),
+            mailboxRegistry(),
+            exchangeService(),
+            failureProcessor()
+        )
+            .go(plan.root());
+
+        qry.addFragment(new RunningFragment<>(plan.root(), node, ectx));
 
         try {
-            messageService().send(origNodeId, new QueryStartResponse(qryId, ectx.fragmentId()));
+            messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId()));
         }
         catch (IgniteCheckedException e) {
             IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);
@@ -800,27 +605,6 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private void register(QueryInfo info) {
-        UUID qryId = info.ctx.queryId();
-
-        running.put(qryId, info);
-
-        GridQueryCancel qryCancel = info.ctx.queryCancel();
-
-        if (qryCancel == null)
-            return;
-
-        try {
-            qryCancel.add(info);
-        }
-        catch (QueryCancelledException e) {
-            running.remove(qryId);
-
-            throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.QUERY_CANCELED);
-        }
-    }
-
-    /** */
     private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlType,
         @Nullable List<List<String>> origins) {
         RelDataType resultType = TypeUtils.getResultType(
@@ -829,15 +613,21 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private boolean single(SqlNode sqlNode) {
-        return !(sqlNode instanceof SqlNodeList);
-    }
-
-    /** */
     private void onMessage(UUID nodeId, final QueryStartRequest msg) {
         assert nodeId != null && msg != null;
 
         try {
+            Query<Row> qry = (Query<Row>)qryReg.register(
+                new Query<>(
+                    msg.queryId(),
+                    nodeId,
+                    null,
+                    exchangeSvc,
+                    (q) -> qryReg.unregister(q.id()),
+                    log
+                )
+            );
+
             final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema());
 
             QueryPlan qryPlan = queryPlanCache().queryPlan(
@@ -859,7 +649,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
                 Commons.parametersMap(msg.parameters())
             );
 
-            executeFragment(msg.queryId(), (FragmentPlan)qryPlan, ectx);
+            executeFragment(qry, (FragmentPlan)qryPlan, ectx);
         }
         catch (Throwable ex) {
             U.error(log, "Failed to start query fragment ", ex);
@@ -879,6 +669,10 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
 
                 e.addSuppressed(ex);
 
+                Query<Row> qry = (Query<Row>)qryReg.query(msg.queryId());
+
+                qry.cancel();
+
                 throw wrpEx;
             }
 
@@ -890,224 +684,41 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     private void onMessage(UUID nodeId, QueryStartResponse msg) {
         assert nodeId != null && msg != null;
 
-        QueryInfo info = running.get(msg.queryId());
+        RunningQuery qry = qryReg.query(msg.queryId());
+
+        if (qry != null) {
+            assert qry instanceof RootQuery : "Unexpected query object: " + qry;
 
-        if (info != null)
-            info.onResponse(nodeId, msg.fragmentId(), msg.error());
+            ((RootQuery<Row>)qry).onResponse(nodeId, msg.fragmentId(), msg.error());
+        }
     }
 
     /** */
     private void onMessage(UUID nodeId, ErrorMessage msg) {
         assert nodeId != null && msg != null;
 
-        QueryInfo info = running.get(msg.queryId());
-
-        if (info != null)
-            info.onError(new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error()));
-    }
+        RunningQuery qry = qryReg.query(msg.queryId());
 
-    /** */
-    private void onNodeLeft(UUID nodeId) {
-        running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId));
-    }
-
-    /** */
-    private enum QueryState {
-        /** */
-        RUNNING,
+        if (qry != null && qry.state() != QueryState.CLOSED) {
+            assert qry instanceof RootQuery : "Unexpected query object: " + qry;
 
-        /** */
-        CLOSING,
+            Exception e = new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error());
 
-        /** */
-        CLOSED
-    }
-
-    /** */
-    private static final class RemoteFragmentKey {
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final long fragmentId;
-
-        /** */
-        private RemoteFragmentKey(UUID nodeId, long fragmentId) {
-            this.nodeId = nodeId;
-            this.fragmentId = fragmentId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            RemoteFragmentKey that = (RemoteFragmentKey) o;
-
-            if (fragmentId != that.fragmentId)
-                return false;
-            return nodeId.equals(that.nodeId);
-        }
+            if (X.hasCause(msg.error(), ExecutionCancelledException.class)) {
+                e = new IgniteSQLException(
+                    "The query was cancelled while executing.",
+                    IgniteQueryErrorCode.QUERY_CANCELED,
+                    e
+                );
+            }
 
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-            res = 31 * res + (int) (fragmentId ^ (fragmentId >>> 32));
-            return res;
+            ((RootQuery<Row>)qry).onError(e);
         }
     }
 
     /** */
-    @SuppressWarnings("TypeMayBeWeakened")
-    private final class QueryInfo implements QueryCancellable {
-        /** */
-        private final ExecutionContext<Row> ctx;
-
-        /** */
-        private final RootNode<Row> root;
-
-        /** remote nodes */
-        private final Set<UUID> remotes;
-
-        /** node to fragment */
-        private final Set<RemoteFragmentKey> waiting;
-
-        /** */
-        private volatile QueryState state;
-
-        /** */
-        private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, Node<Row> root) {
-            this.ctx = ctx;
-
-            RootNode<Row> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
-            rootNode.register(root);
-
-            this.root = rootNode;
-
-            remotes = new HashSet<>();
-            waiting = new HashSet<>();
-
-            for (int i = 1; i < plan.fragments().size(); i++) {
-                Fragment fragment = plan.fragments().get(i);
-                List<UUID> nodes = plan.mapping(fragment).nodeIds();
-
-                remotes.addAll(nodes);
-
-                for (UUID node : nodes)
-                    waiting.add(new RemoteFragmentKey(node, fragment.fragmentId()));
-            }
-
-            state = QueryState.RUNNING;
-        }
-
-        /** */
-        public Iterator<Row> iterator() {
-            return iteratorsHolder().iterator(root);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void doCancel() {
-            root.close();
-        }
-
-        /**
-         * Can be called multiple times after receive each error at {@link #onResponse(RemoteFragmentKey, Throwable)}.
-         */
-        private void tryClose() {
-            QueryState state0 = null;
-
-            synchronized (this) {
-                if (state == QueryState.CLOSED)
-                    return;
-
-                if (state == QueryState.RUNNING)
-                    state0 = state = QueryState.CLOSING;
-
-                // 1) close local fragment
-                root.closeInternal();
-
-                if (state == QueryState.CLOSING && waiting.isEmpty())
-                    state0 = state = QueryState.CLOSED;
-            }
-
-            if (state0 == QueryState.CLOSED) {
-                // 2) unregister runing query
-                running.remove(ctx.queryId());
-
-                IgniteException wrpEx = null;
-
-                // 3) close remote fragments
-                for (UUID nodeId : remotes) {
-                    try {
-                        exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1);
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (wrpEx == null)
-                            wrpEx = new IgniteException("Failed to send cancel message. [nodeId=" + nodeId + ']', e);
-                        else
-                            wrpEx.addSuppressed(e);
-                    }
-                }
-
-                // 4) Cancel local fragment
-                root.context().execute(ctx::cancel, root::onError);
-
-                if (wrpEx != null)
-                    throw wrpEx;
-            }
-        }
-
-        /** */
-        private void onNodeLeft(UUID nodeId) {
-            List<RemoteFragmentKey> fragments = null;
-
-            synchronized (this) {
-                for (RemoteFragmentKey fragment : waiting) {
-                    if (!fragment.nodeId.equals(nodeId))
-                        continue;
-
-                    if (fragments == null)
-                        fragments = new ArrayList<>();
-
-                    fragments.add(fragment);
-                }
-            }
-
-            if (!F.isEmpty(fragments)) {
-                ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException(
-                    "Failed to start query, node left. nodeId=" + nodeId);
-
-                for (RemoteFragmentKey fragment : fragments)
-                    onResponse(fragment, ex);
-            }
-        }
-
-        /** */
-        private void onResponse(UUID nodeId, long fragmentId, Throwable error) {
-            onResponse(new RemoteFragmentKey(nodeId, fragmentId), error);
-        }
-
-        /** */
-        private void onResponse(RemoteFragmentKey fragment, Throwable error) {
-            QueryState state;
-            synchronized (this) {
-                waiting.remove(fragment);
-                state = this.state;
-            }
-
-            if (error != null)
-                onError(error);
-            else if (state == QueryState.CLOSING)
-                tryClose();
-        }
-
-        /** */
-        private void onError(Throwable error) {
-            root.onError(error);
-
-            tryClose();
-        }
+    private void onNodeLeft(UUID nodeId) {
+        qryReg.runningQueries()
+            .forEach((qry) -> ((Query<Row>)qry).onNodeLeft(nodeId));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
index eedd0bc..f9ce1b0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Supplier;
+
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.ignite.IgniteCheckedException;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryEntityEx;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.AlterTableAddCommand;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.AlterTableDropCommand;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.ColumnDefinition;
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DropTable
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.NativeCommandWrapper;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.processors.security.IgniteSecurity;
 import org.apache.ignite.internal.util.typedef.F;
@@ -95,27 +96,27 @@ public class DdlCommandHandler {
     }
 
     /** */
-    public void handle(UUID qryId, DdlCommand cmd, PlanningContext pctx) throws IgniteCheckedException {
+    public void handle(UUID qryId, DdlCommand cmd) throws IgniteCheckedException {
         try {
             if (cmd instanceof CreateTableCommand)
-                handle0(pctx, (CreateTableCommand)cmd);
+                handle0((CreateTableCommand)cmd);
 
             else if (cmd instanceof DropTableCommand)
-                handle0(pctx, (DropTableCommand)cmd);
+                handle0((DropTableCommand)cmd);
 
             else if (cmd instanceof AlterTableAddCommand)
-                handle0(pctx, (AlterTableAddCommand)cmd);
+                handle0((AlterTableAddCommand)cmd);
 
             else if (cmd instanceof AlterTableDropCommand)
-                handle0(pctx, (AlterTableDropCommand)cmd);
+                handle0((AlterTableDropCommand)cmd);
 
             else if (cmd instanceof NativeCommandWrapper)
-                nativeCmdHnd.handle(qryId, (NativeCommandWrapper)cmd, pctx);
+                nativeCmdHnd.handle(qryId, (NativeCommandWrapper)cmd);
 
             else {
                 throw new IgniteSQLException("Unsupported DDL operation [" +
                     "cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; " +
-                    "querySql=\"" + pctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+                    "cmd=\"" + cmd + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
             }
         }
         catch (SchemaOperationException e) {
@@ -124,7 +125,7 @@ public class DdlCommandHandler {
     }
 
     /** */
-    private void handle0(PlanningContext pctx, CreateTableCommand cmd) throws IgniteCheckedException {
+    private void handle0(CreateTableCommand cmd) throws IgniteCheckedException {
         security.authorize(cmd.cacheName(), SecurityPermission.CACHE_CREATE);
 
         isDdlOnSchemaSupported(cmd.schemaName());
@@ -138,7 +139,7 @@ public class DdlCommandHandler {
 
         CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(cmd.tableName());
 
-        QueryEntity e = toQueryEntity(cmd, pctx);
+        QueryEntity e = toQueryEntity(cmd);
 
         ccfg.setQueryEntities(Collections.singleton(e));
         ccfg.setSqlSchema(cmd.schemaName());
@@ -178,7 +179,7 @@ public class DdlCommandHandler {
     }
 
     /** */
-    private void handle0(PlanningContext pctx, DropTableCommand cmd) throws IgniteCheckedException {
+    private void handle0(DropTableCommand cmd) throws IgniteCheckedException {
         isDdlOnSchemaSupported(cmd.schemaName());
 
         Table tbl = schemaSupp.get().getSubSchema(cmd.schemaName()).getTable(cmd.tableName());
@@ -198,7 +199,7 @@ public class DdlCommandHandler {
     }
 
     /** */
-    private void handle0(PlanningContext pctx, AlterTableAddCommand cmd) throws IgniteCheckedException {
+    private void handle0(AlterTableAddCommand cmd) throws IgniteCheckedException {
         isDdlOnSchemaSupported(cmd.schemaName());
 
         GridQueryTypeDescriptor typeDesc = schemaMgr.typeDescriptorForTable(cmd.schemaName(), cmd.tableName());
@@ -225,7 +226,7 @@ public class DdlCommandHandler {
                         continue;
                 }
 
-                Type javaType = pctx.typeFactory().getResultClass(col.type());
+                Type javaType = Commons.typeFactory().getResultClass(col.type());
 
                 String typeName = javaType instanceof Class ? ((Class<?>)javaType).getName() : javaType.getTypeName();
 
@@ -256,7 +257,7 @@ public class DdlCommandHandler {
     }
 
     /** */
-    private void handle0(PlanningContext pctx, AlterTableDropCommand cmd) throws IgniteCheckedException {
+    private void handle0(AlterTableDropCommand cmd) throws IgniteCheckedException {
         isDdlOnSchemaSupported(cmd.schemaName());
 
         GridQueryTypeDescriptor typeDesc = schemaMgr.typeDescriptorForTable(cmd.schemaName(), cmd.tableName());
@@ -308,7 +309,7 @@ public class DdlCommandHandler {
     }
 
     /** */
-    private QueryEntity toQueryEntity(CreateTableCommand cmd, PlanningContext pctx) {
+    private QueryEntity toQueryEntity(CreateTableCommand cmd) {
         QueryEntity res = new QueryEntity();
 
         res.setTableName(cmd.tableName());
@@ -320,7 +321,7 @@ public class DdlCommandHandler {
         Map<String, Integer> precision = new HashMap<>();
         Map<String, Integer> scale = new HashMap<>();
 
-        IgniteTypeFactory tf = pctx.typeFactory();
+        IgniteTypeFactory tf = Commons.typeFactory();
 
         for (ColumnDefinition col : cmd.columns()) {
             String name = col.name();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java
index d2b0738..3cca9d6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/NativeCommandHandler.java
@@ -22,8 +22,6 @@ import java.util.UUID;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.GridQuerySchemaManager;
-import org.apache.ignite.internal.processors.query.SqlClientContext;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.NativeCommandWrapper;
 import org.apache.ignite.internal.sql.SqlCommandProcessor;
 
@@ -44,11 +42,10 @@ public class NativeCommandHandler {
     /**
      * @param qryId Query id.
      * @param cmd   Native command.
-     * @param pctx  Planning context.
      */
-    public FieldsQueryCursor<List<?>> handle(UUID qryId, NativeCommandWrapper cmd, PlanningContext pctx) {
+    public FieldsQueryCursor<List<?>> handle(UUID qryId, NativeCommandWrapper cmd) {
         assert proc.isCommandSupported(cmd.command()) : cmd.command();
 
-        return proc.runCommand(pctx.query(), cmd.command(), pctx.unwrap(SqlClientContext.class));
+        return proc.runCommand(cmd.command());
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index b2f036a..6b7ea84 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -150,10 +150,12 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing
     }
 
     /** {@inheritDoc} */
-    @Override protected void onErrorInternal(Throwable e) {
-        U.error(context().logger(),
-            "Error occurred during execution: " + X.getFullStackTrace(e));
+    @Override public void onError(Throwable e) {
+        onErrorInternal(e);
+    }
 
+    /** {@inheritDoc} */
+    @Override protected void onErrorInternal(Throwable e) {
         try {
             sendError(e);
         }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index e3c6a86..b330e12 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -45,7 +45,7 @@ public enum MessageType {
     QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new),
 
     /** */
-    QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new),
+    QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new),
 
     /** */
     GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
similarity index 58%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
index ba6021c..b80869b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
@@ -19,54 +19,31 @@ package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.nio.ByteBuffer;
 import java.util.UUID;
-
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class OutboxCloseMessage implements CalciteMessage {
-    /** */
-    private UUID queryId;
-
-    /** */
-    private long fragmentId;
-
+public class QueryCloseMessage implements CalciteMessage {
     /** */
-    private long exchangeId;
+    private UUID qryId;
 
     /** */
-    public OutboxCloseMessage() {
+    public QueryCloseMessage() {
         // No-op.
     }
 
     /** */
-    public OutboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) {
-        this.queryId = queryId;
-        this.fragmentId = fragmentId;
-        this.exchangeId = exchangeId;
+    public QueryCloseMessage(UUID qryId) {
+        this.qryId = qryId;
     }
 
     /**
      * @return Query ID.
      */
     public UUID queryId() {
-        return queryId;
-    }
-
-    /**
-     * @return Fragment ID.
-     */
-    public long fragmentId() {
-        return fragmentId;
-    }
-
-    /**
-     * @return Exchange ID.
-     */
-    public long exchangeId() {
-        return exchangeId;
+        return qryId;
     }
 
     /** {@inheritDoc} */
@@ -82,23 +59,10 @@ public class OutboxCloseMessage implements CalciteMessage {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeLong("exchangeId", exchangeId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("fragmentId", fragmentId))
+                if (!writer.writeUuid("queryId", qryId))
                     return false;
 
                 writer.incrementState();
-
-            case 2:
-                if (!writer.writeUuid("queryId", queryId))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -113,23 +77,7 @@ public class OutboxCloseMessage implements CalciteMessage {
 
         switch (reader.state()) {
             case 0:
-                exchangeId = reader.readLong("exchangeId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                fragmentId = reader.readLong("fragmentId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                queryId = reader.readUuid("queryId");
+                qryId = reader.readUuid("queryId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -138,16 +86,16 @@ public class OutboxCloseMessage implements CalciteMessage {
 
         }
 
-        return reader.afterMessageRead(OutboxCloseMessage.class);
+        return reader.afterMessageRead(QueryCloseMessage.class);
     }
 
     /** {@inheritDoc} */
     @Override public MessageType type() {
-        return MessageType.QUERY_OUTBOX_CANCEL_MESSAGE;
+        return MessageType.QUERY_CLOSE_MESSAGE;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 3;
+        return 1;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 7fb55d8..76318e9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.util.typedef.F;
@@ -88,8 +89,8 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan {
     }
 
     /** {@inheritDoc} */
-    @Override public void init(MappingQueryContext ctx) {
-        executionPlan = queryTemplate.map(ctx);
+    @Override public void init(MappingService mappingService, MappingQueryContext ctx) {
+        executionPlan = queryTemplate.map(mappingService, ctx);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java
index 79d3880..c6302e5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DdlPlan.java
@@ -43,4 +43,9 @@ public class DdlPlan implements QueryPlan {
     @Override public QueryPlan copy() {
         return this;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return cmd.toString();
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
index c370afe..fc27362 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 
 /**
  * Regular query or DML
@@ -54,5 +55,5 @@ public interface MultiStepPlan extends QueryPlan {
      *
      * @param ctx Planner context.
      */
-    void init(MappingQueryContext ctx);
+    void init(MappingService mappingService, MappingQueryContext ctx);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerHelper.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerHelper.java
index 0872969..f46610a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerHelper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.exec;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -33,9 +33,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
-import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index cde49b9..36c762d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
@@ -28,6 +29,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.util.CancelFlag;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.jetbrains.annotations.NotNull;
 
@@ -45,6 +47,9 @@ public final class PlanningContext implements Context {
     private final Object[] parameters;
 
     /** */
+    private final CancelFlag cancelFlag = new CancelFlag(new AtomicBoolean());
+
+    /** */
     private Function<RuleSet, RuleSet> rulesFilter;
 
     /** */
@@ -144,6 +149,9 @@ public final class PlanningContext implements Context {
         if (aCls == getClass())
             return aCls.cast(this);
 
+        if (aCls == CancelFlag.class)
+            return aCls.cast(cancelFlag);
+
         return parentCtx.unwrap(aCls);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareService.java
similarity index 78%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareService.java
index 311ef65..7d6f75d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareService.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
 
 /**
  *
  */
-public interface SchemaHolder extends Service {
+public interface PrepareService extends Service {
     /**
-     * @return Schema.
+     * Prepare query plan.
      */
-    SchemaPlus schema();
+    QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
new file mode 100644
index 0000000..a02a715
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlDdl;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.singletonList;
+import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper.optimize;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+public class PrepareServiceImpl extends AbstractService implements PrepareService {
+    /** */
+    private final DdlSqlToCommandConverter ddlConverter;
+
+    /**
+     * @param ctx Kernal.
+     */
+    public PrepareServiceImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        ddlConverter = new DdlSqlToCommandConverter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onStart(GridKernalContext ctx) {
+        super.onStart(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) {
+        try {
+            assert single(sqlNode);
+
+            ctx.planner().reset();
+
+            if (SqlKind.DDL.contains(sqlNode.getKind()))
+                return prepareDdl(sqlNode, ctx);
+
+            switch (sqlNode.getKind()) {
+                case SELECT:
+                case ORDER_BY:
+                case WITH:
+                case VALUES:
+                case UNION:
+                case EXCEPT:
+                case INTERSECT:
+                    return prepareQuery(sqlNode, ctx);
+
+                case INSERT:
+                case DELETE:
+                case UPDATE:
+                    return prepareDml(sqlNode, ctx);
+
+                case EXPLAIN:
+                    return prepareExplain(sqlNode, ctx);
+
+                default:
+                    throw new IgniteSQLException("Unsupported operation [" +
+                        "sqlNodeKind=" + sqlNode.getKind() + "; " +
+                        "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            }
+        }
+        catch (ValidationException | CalciteContextException e) {
+            throw new IgniteSQLException("Failed to validate query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
+        }
+    }
+
+    /**
+     *
+     */
+    private QueryPlan prepareDdl(SqlNode sqlNode, PlanningContext ctx) {
+        assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : sqlNode.getClass().getName();
+
+        return new DdlPlan(ddlConverter.convert((SqlDdl)sqlNode, ctx));
+    }
+
+    /**
+     *
+     */
+    private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
+        IgnitePlanner planner = ctx.planner();
+
+        SqlNode sql = ((SqlExplain)explain).getExplicandum();
+
+        // Validate
+        sql = planner.validate(sql);
+
+        // Convert to Relational operators graph
+        IgniteRel igniteRel = optimize(sql, planner, log);
+
+        String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
+
+        return new ExplainPlan(plan, explainFieldsMetadata(ctx));
+    }
+
+    /** */
+    private boolean single(SqlNode sqlNode) {
+        return !(sqlNode instanceof SqlNodeList);
+    }
+
+    /** */
+    private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
+        IgnitePlanner planner = ctx.planner();
+
+        // Validate
+        ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
+
+        sqlNode = validated.sqlNode();
+
+        IgniteRel igniteRel = optimize(sqlNode, planner, log);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        QueryTemplate template = new QueryTemplate(fragments);
+
+        return new MultiStepQueryPlan(template, queryFieldsMetadata(ctx, validated.dataType(), validated.origins()));
+    }
+
+    /** */
+    private QueryPlan prepareDml(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+        IgnitePlanner planner = ctx.planner();
+
+        // Validate
+        sqlNode = planner.validate(sqlNode);
+
+        // Convert to Relational operators graph
+        IgniteRel igniteRel = optimize(sqlNode, planner, log);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        QueryTemplate template = new QueryTemplate(fragments);
+
+        return new MultiStepDmlPlan(template, queryFieldsMetadata(ctx, igniteRel.getRowType(), null));
+    }
+
+    /** */
+    private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlType,
+        @Nullable List<List<String>> origins) {
+        RelDataType resultType = TypeUtils.getResultType(
+            ctx.typeFactory(), ctx.catalogReader(), sqlType, origins);
+
+        return new FieldsMetadataImpl(resultType, origins);
+    }
+
+    /** */
+    private FieldsMetadata explainFieldsMetadata(PlanningContext ctx) {
+        IgniteTypeFactory factory = ctx.typeFactory();
+        RelDataType planStrDataType =
+            factory.createSqlType(SqlTypeName.VARCHAR, PRECISION_NOT_SPECIFIED);
+        T2<String, RelDataType> planField = new T2<>(ExplainPlan.PLAN_COL_NAME, planStrDataType);
+        RelDataType planDataType = factory.createStructType(singletonList(planField));
+
+        return queryFieldsMetadata(ctx, planDataType, null);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
index 12c00f8..ac8bbc4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -37,18 +38,13 @@ import org.jetbrains.annotations.NotNull;
 /** */
 public class QueryTemplate {
     /** */
-    private final MappingService mappingService;
-
-    /** */
     private final ImmutableList<Fragment> fragments;
 
     /** */
     private final AtomicReference<ExecutionPlan> executionPlan = new AtomicReference<>();
 
     /** */
-    public QueryTemplate(MappingService mappingService, List<Fragment> fragments) {
-        this.mappingService = mappingService;
-
+    public QueryTemplate(List<Fragment> fragments) {
         ImmutableList.Builder<Fragment> b = ImmutableList.builder();
         for (Fragment fragment : fragments)
             b.add(fragment.copy());
@@ -57,7 +53,7 @@ public class QueryTemplate {
     }
 
     /** */
-    public ExecutionPlan map(MappingQueryContext ctx) {
+    public ExecutionPlan map(MappingService mappingService, MappingQueryContext ctx) {
         ExecutionPlan executionPlan = this.executionPlan.get();
 
         if (executionPlan != null && Objects.equals(executionPlan.topologyVersion(), ctx.topologyVersion()))
@@ -70,7 +66,7 @@ public class QueryTemplate {
 
         for (int i = 0; i < 3; i++) {
             try {
-                ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(fragments, ctx, mq));
+                ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(mappingService, fragments, ctx, mq));
 
                 if (executionPlan == null || executionPlan.topologyVersion().before(executionPlan0.topologyVersion()))
                     this.executionPlan.compareAndSet(executionPlan, executionPlan0);
@@ -91,7 +87,12 @@ public class QueryTemplate {
     }
 
     /** */
-    @NotNull private List<Fragment> map(List<Fragment> fragments, MappingQueryContext ctx, RelMetadataQuery mq) {
+    @NotNull private List<Fragment> map(
+        MappingService mappingService,
+        List<Fragment> fragments,
+        MappingQueryContext ctx,
+        RelMetadataQuery mq
+    ) {
         ImmutableList.Builder<Fragment> b = ImmutableList.builder();
 
         for (Fragment fragment : fragments)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java
index f615bc8..d67c7ab 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/patch/AggregateExpandDistinctAggregatesRule.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
index 311ef65..2cb4eec 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.query.calcite.schema;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public interface SchemaHolder extends Service {
     /**
-     * @return Schema.
+     * @return Specified schema if the schema name is specified or default schema when {@code schema} is {@code null}.
      */
-    SchemaPlus schema();
+    SchemaPlus schema(@Nullable String schema);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 84c597b..1e618ba 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -153,11 +153,6 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
     }
 
     /** {@inheritDoc} */
-    @Override public SchemaPlus schema() {
-        return calciteSchema;
-    }
-
-    /** {@inheritDoc} */
     @Override public synchronized void onSchemaCreated(String schemaName) {
         igniteSchemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
         rebuild();
@@ -269,6 +264,11 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
         rebuild();
     }
 
+    /** {@inheritDoc} */
+    @Override public SchemaPlus schema(@Nullable String schema) {
+        return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
+    }
+
     /** */
     private void rebuild() {
         SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index aacefa0..d9d5e4d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -246,16 +246,16 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
                 assertEquals(1, res.size());
                 assertEquals(1, res.get(0).size());
                 assertEquals(40L, res.get(0).get(0));
+
+                awaitReservationsRelease("RISK");
+                awaitReservationsRelease("TRADE");
+                awaitReservationsRelease("BATCH");
+
+                assertFalse(lsnr.check());
             }
         }
 
-        assertFalse(lsnr.check());
-
         listeningLog.clearListeners();
-
-        awaitReservationsRelease("RISK");
-        awaitReservationsRelease("TRADE");
-        awaitReservationsRelease("BATCH");
     }
 
     /**
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index be86124..899747d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static java.util.Collections.singletonList;
@@ -147,6 +148,9 @@ public class CancelTest extends GridCommonAbstractTest {
             fail("Unexpected exception: " + ex);
         }
 
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), 10_000));
+
         awaitReservationsRelease(grid(0), "TEST");
     }
 
@@ -168,6 +172,11 @@ public class CancelTest extends GridCommonAbstractTest {
 
         stopGrid(0);
 
+        QueryEngine engine1 = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine1.runningQueries().isEmpty(), 10_000));
+
         awaitReservationsRelease(grid(1), "TEST");
     }
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index e814c8c..4649703 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -485,6 +485,6 @@ public abstract class QueryChecker {
         }, PART_RELEASE_TIMEOUT);
 
         for (GridDhtLocalPartition p : parts)
-            assertEquals("Partition is reserved: " + p, 0, p.reservations());
+            assertEquals("Partition is reserved: [node=" + node.name() + ", part=" + p, 0, p.reservations());
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 588602e..b733b6f 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.List;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
@@ -131,6 +132,25 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
     }
 
     /** */
+    protected CalciteQueryProcessor queryProcessor(IgniteEx ignite) {
+        return Commons.lookupComponent(ignite.context(), CalciteQueryProcessor.class);
+    }
+
+    /** */
+    protected List<List<?>> sql(String sql, Object... params) {
+        return sql(client, sql, params);
+    }
+
+    /** */
+    protected List<List<?>> sql(IgniteEx ignite, String sql, Object... params) {
+        List<FieldsQueryCursor<List<?>>> cur = queryProcessor(ignite).query(null, "PUBLIC", sql, params);
+
+        try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
+            return srvCursor.getAll();
+        }
+    }
+
+    /** */
     public static class Employer {
         /** */
         @QuerySqlField
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
index c89bbab..5496b9c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
@@ -16,40 +16,29 @@
  */
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
-import java.util.List;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.SqlConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.After;
-import org.junit.Before;
 
 /** */
-public class AbstractDdlIntegrationTest extends GridCommonAbstractTest {
-    /** */
-    private static final String CLIENT_NODE_NAME = "client";
-
+public class AbstractDdlIntegrationTest extends AbstractBasicIntegrationTest {
     /** */
     protected static final String DATA_REGION_NAME = "test_data_region";
 
     /** */
     protected static final String PERSISTENT_DATA_REGION = "pds_data_region";
 
-    /** */
-    protected IgniteEx client;
+    /** {@inheritDoc} */
+    @Override protected int nodeCount() {
+        return 1;
+    }
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
-        startGrids(2);
-
-        client = startClientGrid(CLIENT_NODE_NAME);
+        super.beforeTestsStarted();
 
         client.cluster().state(ClusterState.ACTIVE);
     }
@@ -77,33 +66,8 @@ public class AbstractDdlIntegrationTest extends GridCommonAbstractTest {
     }
 
     /** */
-    @Before
-    public void init() {
-        client = grid(CLIENT_NODE_NAME);
-    }
-
-    /** */
     @After
     public void cleanUp() {
         client.destroyCaches(client.cacheNames());
     }
-
-    /** */
-    protected List<List<?>> executeSql(String sql, Object... params) {
-        return executeSql(client, sql, params);
-    }
-
-    /** */
-    protected List<List<?>> executeSql(IgniteEx ignite, String sql, Object... params) {
-        List<FieldsQueryCursor<List<?>>> cur = queryProcessor(ignite).query(null, "PUBLIC", sql, params);
-
-        try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
-            return srvCursor.getAll();
-        }
-    }
-
-    /** */
-    private CalciteQueryProcessor queryProcessor(IgniteEx ignite) {
-        return Commons.lookupComponent(ignite.context(), CalciteQueryProcessor.class);
-    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java
index a7940a3..cc89fc8 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexDdlIntegrationTest.java
@@ -37,10 +37,8 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     private static final String CACHE_NAME = "my_cache";
 
     /** {@inheritDoc} */
-    @Override public void init() {
-        super.init();
-
-        executeSql("create table my_table(id int, val_int int, val_str varchar) with cache_name=\"" + CACHE_NAME + "\"");
+    @Override protected void beforeTest() throws Exception {
+        sql("create table my_table(id int, val_int int, val_str varchar) with cache_name=\"" + CACHE_NAME + "\"");
     }
 
     /**
@@ -50,11 +48,11 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createDropIndexSimpleCase() {
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index my_index on my_table(id)");
+        sql("create index my_index on my_table(id)");
 
         assertNotNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("drop index my_index");
+        sql("drop index my_index");
 
         assertNull(findIndex(CACHE_NAME, "my_index"));
     }
@@ -66,20 +64,20 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createDropIndexWithSchema() {
         String cacheName = "cache2";
 
-        executeSql("create table my_schema.my_table2(id int) with cache_name=\"" + cacheName + "\"");
+        sql("create table my_schema.my_table2(id int) with cache_name=\"" + cacheName + "\"");
 
         assertNull(findIndex(cacheName, "my_index2"));
 
-        executeSql("create index my_index2 on my_schema.my_table2(id)");
+        sql("create index my_index2 on my_schema.my_table2(id)");
 
         assertNotNull(findIndex(cacheName, "my_index2"));
 
-        GridTestUtils.assertThrowsAnyCause(log, () -> executeSql("drop index my_index2"), IgniteSQLException.class,
+        GridTestUtils.assertThrowsAnyCause(log, () -> sql("drop index my_index2"), IgniteSQLException.class,
             "Index doesn't exist");
 
         assertNotNull(findIndex(cacheName, "my_index2"));
 
-        executeSql("drop index my_schema.my_index2");
+        sql("drop index my_schema.my_index2");
 
         assertNull(findIndex(cacheName, "my_index2"));
     }
@@ -91,14 +89,14 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createIndexWithIfNotExistsClause() {
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index if not exists my_index on my_table(id)");
+        sql("create index if not exists my_index on my_table(id)");
 
-        GridTestUtils.assertThrowsAnyCause(log, () -> executeSql("create index my_index on my_table(val_int)"),
+        GridTestUtils.assertThrowsAnyCause(log, () -> sql("create index my_index on my_table(val_int)"),
             IgniteSQLException.class, "Index already exists");
 
         assertNotNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index if not exists my_index on my_table(val_str)");
+        sql("create index if not exists my_index on my_table(val_str)");
 
         Index idx = findIndex(CACHE_NAME, "my_index");
 
@@ -116,17 +114,17 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void dropIndexWithIfExistsClause() {
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index my_index on my_table(id)");
+        sql("create index my_index on my_table(id)");
 
         assertNotNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("drop index if exists my_index");
+        sql("drop index if exists my_index");
 
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("drop index if exists my_index");
+        sql("drop index if exists my_index");
 
-        GridTestUtils.assertThrowsAnyCause(log, () -> executeSql("drop index my_index"), IgniteSQLException.class,
+        GridTestUtils.assertThrowsAnyCause(log, () -> sql("drop index my_index"), IgniteSQLException.class,
             "Index doesn't exist");
     }
 
@@ -137,7 +135,7 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createIndexWithColumnsOrdering() {
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index my_index on my_table(id, val_int asc, val_str desc)");
+        sql("create index my_index on my_table(id, val_int asc, val_str desc)");
 
         Index idx = findIndex(CACHE_NAME, "my_index");
 
@@ -161,7 +159,7 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createIndexWithInlineSize() {
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index my_index on my_table(val_str) inline_size 10");
+        sql("create index my_index on my_table(val_str) inline_size 10");
 
         Index idx = findIndex(CACHE_NAME, "my_index");
 
@@ -180,7 +178,7 @@ public class IndexDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createIndexWithParallel() {
         assertNull(findIndex(CACHE_NAME, "my_index"));
 
-        executeSql("create index my_index on my_table(val_str) parallel 10");
+        sql("create index my_index on my_table(val_str) parallel 10");
 
         assertNotNull(findIndex(CACHE_NAME, "my_index"));
     }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
index 120df5a..cbd9dc1 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
@@ -82,6 +82,11 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
             cache.put(i, i);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        // No-op.
+    }
+
     /** */
     @Override public void cleanUp() {
         // No-op.
@@ -105,7 +110,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
         long qryId = qryView.queryId();
         UUID originNodeId = qryView.originNodeId();
 
-        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+        sql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
 
         // Fetch all cached entries.
         for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
@@ -148,7 +153,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
 
             assertTrue(res);
 
-            executeSql(client, "KILL COMPUTE '" + jobViewHolder.get().id() + "'");
+            sql(client, "KILL COMPUTE '" + jobViewHolder.get().id() + "'");
 
             assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
         }
@@ -167,7 +172,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
         try (Transaction tx = client.transactions().txStart()) {
             cache.put(testKey, 1);
 
-            executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+            sql(client, "KILL TRANSACTION '" + tx.xid() + "'");
 
             assertThrowsWithCause(tx::commit, IgniteException.class);
         }
@@ -191,7 +196,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
         TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
         assertNotNull(svc);
 
-        executeSql(client, "KILL SERVICE '" + serviceName + "'");
+        sql(client, "KILL SERVICE '" + serviceName + "'");
 
         boolean res = waitForCondition(() -> grid(0).context().systemView().view(SVCS_VIEW).size() == 0, TIMEOUT);
         assertTrue(res);
@@ -232,7 +237,7 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
         UUID nodeId = cqView.nodeId();
         UUID routineId = cqView.routineId();
 
-        executeSql(client, "KILL CONTINUOUS '" + nodeId + "' '" + routineId + "'");
+        sql(client, "KILL CONTINUOUS '" + nodeId + "' '" + routineId + "'");
 
         long cnt = cntr.get();
 
@@ -247,31 +252,31 @@ public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
     /** */
     @Test
     public void testCancelUnknownScanQuery() {
-        executeSql(client, "KILL SCAN '" + client.localNode().id() + "' 'unknown' 1");
+        sql(client, "KILL SCAN '" + client.localNode().id() + "' 'unknown' 1");
     }
 
     /** */
     @Test
     public void testCancelUnknownComputeTask() {
-        executeSql(client, "KILL COMPUTE '" + IgniteUuid.randomUuid() + "'");
+        sql(client, "KILL COMPUTE '" + IgniteUuid.randomUuid() + "'");
     }
 
     /** */
     @Test
     public void testCancelUnknownService() {
-        executeSql(client, "KILL SERVICE 'unknown'");
+        sql(client, "KILL SERVICE 'unknown'");
     }
 
     /** */
     @Test
     public void testCancelUnknownTx() {
-        executeSql(client, "KILL TRANSACTION '" + IgniteUuid.randomUuid() + "'");
+        sql(client, "KILL TRANSACTION '" + IgniteUuid.randomUuid() + "'");
     }
 
     /** */
     @Test
     public void testCancelUnknownContinuousQuery() {
-        executeSql(client, "KILL CONTINUOUS '" + grid(0).localNode().id() + "' '" + UUID.randomUUID() + "'");
+        sql(client, "KILL CONTINUOUS '" + grid(0).localNode().id() + "' '" + UUID.randomUUID() + "'");
     }
 
     /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
new file mode 100644
index 0000000..380a1c2
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ *
+ */
+public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static IgniteEx srv;
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 10_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        srv = grid(0);
+    }
+
+    /**
+     * Execute query with a lot of JOINs to produce very long planning phase.
+     * Cancel query on planning phase and check query registry is empty on the all nodes of the cluster.
+     */
+    @Test
+    public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+        QueryEngine engine = queryProcessor(client);
+        int cnt = 9;
+
+        for (int i = 0; i < cnt; i++)
+            sql("CREATE TABLE test_tbl" + i + " (id int, val varchar)");
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "test_tbl" + i + " p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = engine.runningQueries();
+
+        assertEquals("Running: " + running, 1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        assertSame(qry, engine.runningQuery(qry.id()));
+
+        // Waits for planning.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning");
+    }
+
+    /**
+     * Execute query with a lot of JOINs to produce very long excution phase.
+     * Cancel query on execution phase and check query registry is empty on the all nodes of the cluster.
+     */
+    @Test
+    public void testCancelAtExecutionPhase() throws IgniteCheckedException {
+        QueryEngine cliEngine = queryProcessor(client);
+        QueryEngine srvEngine = queryProcessor(srv);
+        int cnt = 6;
+
+        sql("CREATE TABLE person (id int, val varchar)");
+
+        String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + i + ")").collect(joining(", "));
+        String insertSql = "INSERT INTO person (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        // The query is executing on client.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = cliEngine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        // The query is executing on sever.
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = srvEngine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = cliEngine.runningQueries();
+
+        assertEquals(1, running.size());
+
+        RunningQuery qry = F.first(running);
+
+        assertSame(qry, cliEngine.runningQuery(qry.id()));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Execute query with a lot of JOINs to produce very long excution phase.
+     * Cancel query on execution phase on remote node (no query originator node)
+     * and check query registry is empty on the all nodes of the cluster.
+     */
+    @Test
+    public void testCancelByRemoteFragment() throws IgniteCheckedException {
+        QueryEngine clientEngine = queryProcessor(client);
+        QueryEngine serverEngine = queryProcessor(srv);
+        int cnt = 6;
+
+        sql("CREATE TABLE t (id int, val varchar)");
+
+        String data = IntStream.range(0, 10000).mapToObj((i) -> "(" + i + ",'" + i + "')").collect(joining(", "));
+        String insertSql = "INSERT INTO t (id, val) VALUES " + data;
+
+        sql(insertSql);
+
+        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "t t" + i).collect(joining(", "));
+        String sql = "SELECT * FROM " + bigJoin;
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> {
+                Collection<? extends RunningQuery> queries = clientEngine.runningQueries();
+
+                return !queries.isEmpty() && F.first(queries).state() == QueryState.EXECUTING;
+            },
+            TIMEOUT_IN_MS));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(() -> !serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        Collection<? extends RunningQuery> running = serverEngine.runningQueries();
+        RunningQuery qry = F.first(running);
+
+        assertSame(qry, serverEngine.runningQuery(qry.id()));
+
+        qry.cancel();
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> clientEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        Assert.assertTrue(GridTestUtils.waitForCondition(
+            () -> serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
index f56a3d1..0a85fae 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SetOpIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.List;
+
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index b7c02c4..57ac900 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -60,7 +60,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createTableSimpleCase() {
         Set<String> cachesBefore = new HashSet<>(client.cacheNames());
 
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
         Set<String> newCaches = new HashSet<>(client.cacheNames());
         newCaches.removeAll(cachesBefore);
@@ -91,7 +91,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableDifferentDataTypes() {
-        executeSql("create table my_table (" +
+        sql("create table my_table (" +
             "id int primary key, " +
             "c1 varchar, " +
             "c2 date, " +
@@ -107,7 +107,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
             "c12 decimal(20, 10) " +
             ")");
 
-        executeSql("insert into my_table values (" +
+        sql("insert into my_table values (" +
             "0, " +
             "'test', " +
             "date '2021-01-01', " +
@@ -123,7 +123,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
             "1234567890.1234567890" +
             ")");
 
-        List<List<?>> res = executeSql("select * from my_table");
+        List<List<?>> res = sql("select * from my_table");
 
         assertEquals(1, res.size());
 
@@ -152,7 +152,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createTableWithOptions() {
         Set<String> cachesBefore = new HashSet<>(client.cacheNames());
 
-        executeSql("create table my_table (id1 int, id2 int, val varchar, primary key(id1, id2)) with " +
+        sql("create table my_table (id1 int, id2 int, val varchar, primary key(id1, id2)) with " +
             " backups=2," +
             " affinity_key=id2," +
             " atomicity=transactional," +
@@ -199,8 +199,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createTableWithTemplate() {
         Set<String> cachesBefore = new HashSet<>(client.cacheNames());
 
-        executeSql("create table my_table_replicated (id int, val varchar) with template=replicated, cache_name=repl");
-        executeSql("create table my_table_partitioned (id int, val varchar) with template=partitioned, cache_name=part");
+        sql("create table my_table_replicated (id int, val varchar) with template=replicated, cache_name=repl");
+        sql("create table my_table_partitioned (id int, val varchar) with template=partitioned, cache_name=part");
 
         Set<String> newCaches = new HashSet<>(client.cacheNames());
         newCaches.removeAll(cachesBefore);
@@ -224,13 +224,13 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     @Test
     @SuppressWarnings("ThrowableNotThrown")
     public void createTableIfNotExists() {
-        executeSql("create table my_table (id int, val varchar)");
+        sql("create table my_table (id int, val varchar)");
 
         GridTestUtils.assertThrows(log,
-            () -> executeSql("create table my_table (id int, val varchar)"),
+            () -> sql("create table my_table (id int, val varchar)"),
             IgniteSQLException.class, "Table already exists: MY_TABLE");
 
-        executeSql("create table if not exists my_table (id int, val varchar)");
+        sql("create table if not exists my_table (id int, val varchar)");
     }
 
     /**
@@ -238,12 +238,12 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableWithoutPk() {
-        executeSql("create table my_table (f1 int, f2 varchar)");
+        sql("create table my_table (f1 int, f2 varchar)");
 
-        executeSql("insert into my_table(f1, f2) values (1, '1'),(2, '2')");
-        executeSql("insert into my_table values (1, '1'),(2, '2')");
+        sql("insert into my_table(f1, f2) values (1, '1'),(2, '2')");
+        sql("insert into my_table values (1, '1'),(2, '2')");
 
-        assertThat(executeSql("select * from my_table"), hasSize(4));
+        assertThat(sql("select * from my_table"), hasSize(4));
     }
 
     /**
@@ -251,12 +251,12 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableCustomSchema() {
-        executeSql("create table my_schema.my_table (f1 int, f2 varchar)");
+        sql("create table my_schema.my_table (f1 int, f2 varchar)");
 
-        executeSql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
-        executeSql("insert into my_schema.my_table values (1, '1'),(2, '2')");
+        sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
+        sql("insert into my_schema.my_table values (1, '1'),(2, '2')");
 
-        assertThat(executeSql("select * from my_schema.my_table"), hasSize(4));
+        assertThat(sql("select * from my_schema.my_table"), hasSize(4));
     }
 
     /**
@@ -266,11 +266,11 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void createTableOnExistingCache() {
         IgniteCache<Object, Object> cache = client.getOrCreateCache("my_cache");
 
-        executeSql("create table my_schema.my_table (f1 int, f2 varchar) with cache_name=\"my_cache\"");
+        sql("create table my_schema.my_table (f1 int, f2 varchar) with cache_name=\"my_cache\"");
 
-        executeSql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
+        sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
 
-        assertThat(executeSql("select * from my_schema.my_table"), hasSize(2));
+        assertThat(sql("select * from my_schema.my_table"), hasSize(2));
 
         assertEquals(2, cache.size(CachePeekMode.PRIMARY));
     }
@@ -280,8 +280,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableAsSelectSimpleCase() {
-        executeSql("create table my_table as select 1 as i, 'test' as s");
-        List<List<?>> res = executeSql("select i, s from my_table");
+        sql("create table my_table as select 1 as i, 'test' as s");
+        List<List<?>> res = sql("select i, s from my_table");
 
         assertEquals(1, res.size());
 
@@ -296,8 +296,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableAsSelectWithColumns() {
-        executeSql("create table my_table(i, s) as select 1 as a, 'test' as b");
-        List<List<?>> res = executeSql("select i, s from my_table");
+        sql("create table my_table(i, s) as select 1 as a, 'test' as b");
+        List<List<?>> res = sql("select i, s from my_table");
 
         assertEquals(1, res.size());
 
@@ -315,8 +315,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     @SuppressWarnings("unchecked")
     @Test
     public void createTableAsSelectWithOptions() {
-        executeSql("create table my_table(s, i) with cache_name=\"CacheWithOpts\", cache_group=\"CacheGroup\" as select '1', 1");
-        List<List<?>> res = executeSql("select * from my_table");
+        sql("create table my_table(s, i) with cache_name=\"CacheWithOpts\", cache_group=\"CacheGroup\" as select '1', 1");
+        List<List<?>> res = sql("select * from my_table");
 
         assertEquals(1, res.size());
 
@@ -335,8 +335,8 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableAsSelectWithParameters() {
-        executeSql("create table my_table(s, i) as select cast(? as varchar), cast(? as int)", "a", 1);
-        List<List<?>> res = executeSql("select * from my_table");
+        sql("create table my_table(s, i) as select cast(? as varchar), cast(? as int)", "a", 1);
+        List<List<?>> res = sql("select * from my_table");
 
         assertEquals(1, res.size());
 
@@ -350,7 +350,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     @Test
     public void createTableAsSelectWrongColumnsCount() {
         GridTestUtils.assertThrowsAnyCause(log,
-            () -> executeSql("create table my_table(i, s1, s2) as select 1, 'test'"),
+            () -> sql("create table my_table(i, s1, s2) as select 1, 'test'"),
             IgniteSQLException.class, "Number of columns");
     }
 
@@ -359,13 +359,13 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableAsSelectFromDistributedTable() {
-        executeSql("create table my_table1(i) as select x from table(system_range(1, 100))");
+        sql("create table my_table1(i) as select x from table(system_range(1, 100))");
 
-        assertEquals(100L, executeSql("select count(*) from my_table1").get(0).get(0));
+        assertEquals(100L, sql("select count(*) from my_table1").get(0).get(0));
 
-        executeSql("create table my_table2(i) as select * from my_table1");
+        sql("create table my_table2(i) as select * from my_table1");
 
-        assertEquals(100L, executeSql("select count(*) from my_table2").get(0).get(0));
+        assertEquals(100L, sql("select count(*) from my_table2").get(0).get(0));
     }
 
     /**
@@ -375,7 +375,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void dropTableDefaultSchema() {
         Set<String> cachesBefore = new HashSet<>(client.cacheNames());
 
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
         Set<String> cachesAfter = new HashSet<>(client.cacheNames());
         cachesAfter.removeAll(cachesBefore);
@@ -384,7 +384,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
 
         String createdCacheName = cachesAfter.iterator().next();
 
-        executeSql("drop table my_table");
+        sql("drop table my_table");
 
         cachesAfter = new HashSet<>(client.cacheNames());
         cachesAfter.removeAll(cachesBefore);
@@ -401,7 +401,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void dropTableCustomSchema() {
         Set<String> cachesBefore = new HashSet<>(client.cacheNames());
 
-        executeSql("create table my_schema.my_table (id int primary key, val varchar)");
+        sql("create table my_schema.my_table (id int primary key, val varchar)");
 
         Set<String> cachesAfter = new HashSet<>(client.cacheNames());
         cachesAfter.removeAll(cachesBefore);
@@ -410,7 +410,7 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
 
         String createdCacheName = cachesAfter.iterator().next();
 
-        executeSql("drop table my_schema.my_table");
+        sql("drop table my_schema.my_table");
 
         cachesAfter = new HashSet<>(client.cacheNames());
         cachesAfter.removeAll(cachesBefore);
@@ -427,19 +427,19 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     @Test
     @SuppressWarnings("ThrowableNotThrown")
     public void dropTableIfExists() {
-        executeSql("create table my_schema.my_table (id int primary key, val varchar)");
+        sql("create table my_schema.my_table (id int primary key, val varchar)");
 
         GridTestUtils.assertThrows(log,
-            () -> executeSql("drop table my_table"),
+            () -> sql("drop table my_table"),
             IgniteSQLException.class, "Table doesn't exist: MY_TABLE");
 
-        executeSql("drop table my_schema.my_table");
+        sql("drop table my_schema.my_table");
 
         GridTestUtils.assertThrows(log,
-            () -> executeSql("drop table my_schema.my_table"),
+            () -> sql("drop table my_schema.my_table"),
             IgniteSQLException.class, "Table doesn't exist: MY_TABLE");
 
-        executeSql("drop table if exists my_schema.my_table");
+        sql("drop table if exists my_schema.my_table");
     }
 
     /**
@@ -447,20 +447,20 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableAddDropSimpleCase() {
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
-        executeSql("alter table my_table add column val2 varchar");
+        sql("alter table my_table add column val2 varchar");
 
-        executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+        sql("insert into my_table (id, val, val2) values (0, '1', '2')");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals("2", res.get(0).get(2));
 
-        executeSql("alter table my_table drop column val2");
+        sql("alter table my_table drop column val2");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
@@ -471,21 +471,21 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableAddDropTwoColumns() {
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
-        executeSql("alter table my_table add column (val2 varchar, val3 int)");
+        sql("alter table my_table add column (val2 varchar, val3 int)");
 
-        executeSql("insert into my_table (id, val, val2, val3) values (0, '1', '2', 3)");
+        sql("insert into my_table (id, val, val2, val3) values (0, '1', '2', 3)");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals("2", res.get(0).get(2));
         assertEquals(3, res.get(0).get(3));
 
-        executeSql("alter table my_table drop column (val2, val3)");
+        sql("alter table my_table drop column (val2, val3)");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
@@ -496,20 +496,20 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableAddDropCustomSchema() {
-        executeSql("create table my_schema.my_table (id int primary key, val varchar)");
+        sql("create table my_schema.my_table (id int primary key, val varchar)");
 
-        executeSql("alter table my_schema.my_table add column val2 varchar");
+        sql("alter table my_schema.my_table add column val2 varchar");
 
-        executeSql("insert into my_schema.my_table (id, val, val2) values (0, '1', '2')");
+        sql("insert into my_schema.my_table (id, val, val2) values (0, '1', '2')");
 
-        List<List<?>> res = executeSql("select * from my_schema.my_table ");
+        List<List<?>> res = sql("select * from my_schema.my_table ");
 
         assertEquals(1, res.size());
         assertEquals("2", res.get(0).get(2));
 
-        executeSql("alter table my_schema.my_table drop column val2");
+        sql("alter table my_schema.my_table drop column val2");
 
-        res = executeSql("select * from my_schema.my_table ");
+        res = sql("select * from my_schema.my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
@@ -522,29 +522,29 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     public void alterTableAddDropIfTableExists() {
         assertThrows("alter table my_table add val2 varchar", IgniteSQLException.class, "Table doesn't exist");
 
-        executeSql("alter table if exists my_table add column val2 varchar");
+        sql("alter table if exists my_table add column val2 varchar");
 
         assertThrows("alter table my_table drop column val2", IgniteSQLException.class, "Table doesn't exist");
 
-        executeSql("alter table if exists my_table drop column val2");
+        sql("alter table if exists my_table drop column val2");
 
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
-        executeSql("alter table if exists my_table add column val3 varchar");
+        sql("alter table if exists my_table add column val3 varchar");
 
-        executeSql("insert into my_table (id, val, val3) values (0, '1', '2')");
+        sql("insert into my_table (id, val, val3) values (0, '1', '2')");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals("2", res.get(0).get(2));
 
-        executeSql("alter table if exists my_table drop column val3");
+        sql("alter table if exists my_table drop column val3");
 
         assertThrows("alter table if exists my_table drop column val3", IgniteSQLException.class,
             "Column doesn't exist");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
@@ -555,50 +555,50 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableAddDropIfColumnExists() {
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
-        executeSql("insert into my_table (id, val) values (0, '1')");
+        sql("insert into my_table (id, val) values (0, '1')");
 
         assertThrows("alter table my_table add column val varchar", IgniteSQLException.class,
             "Column already exist");
 
-        executeSql("alter table my_table add column if not exists val varchar");
+        sql("alter table my_table add column if not exists val varchar");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
 
         assertThrows("alter table my_table drop column val2", IgniteSQLException.class,
             "Column doesn't exist");
 
-        executeSql("alter table my_table drop column if exists val2");
+        sql("alter table my_table drop column if exists val2");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
 
-        executeSql("alter table my_table add column if not exists val3 varchar");
+        sql("alter table my_table add column if not exists val3 varchar");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(3, res.get(0).size());
 
-        executeSql("alter table my_table drop column if exists val3");
+        sql("alter table my_table drop column if exists val3");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
 
         // Mixing existsing and not existsing columns
-        executeSql("alter table my_table add column if not exists (val varchar, val4 varchar)");
+        sql("alter table my_table add column if not exists (val varchar, val4 varchar)");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(3, res.get(0).size());
 
-        executeSql("alter table my_table drop column if exists (val4, val5)");
+        sql("alter table my_table drop column if exists (val4, val5)");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
     }
@@ -608,16 +608,16 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableAddNotNullColumn() {
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
-        executeSql("alter table my_table add column val2 varchar not null");
+        sql("alter table my_table add column val2 varchar not null");
 
         assertThrows("insert into my_table (id, val, val2) values (0, '1', null)", IgniteSQLException.class,
             "Null value is not allowed");
 
-        executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+        sql("insert into my_table (id, val, val2) values (0, '1', '2')");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals("2", res.get(0).get(2));
@@ -628,11 +628,11 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableDropForbiddenColumn() {
-        executeSql("create table my_table (id int primary key, val varchar, val2 varchar)");
+        sql("create table my_table (id int primary key, val varchar, val2 varchar)");
 
-        executeSql("create index my_index on my_table(val)");
+        sql("create index my_index on my_table(val)");
 
-        executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+        sql("insert into my_table (id, val, val2) values (0, '1', '2')");
 
         assertThrows("alter table my_table drop column id", IgniteSQLException.class,
             "Cannot drop column");
@@ -640,14 +640,14 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
         assertThrows("alter table my_table drop column val", IgniteSQLException.class,
             "Cannot drop column");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(3, res.get(0).size());
 
-        executeSql("alter table my_table drop column val2");
+        sql("alter table my_table drop column val2");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
@@ -658,39 +658,39 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableServerAndClient() throws Exception {
-        executeSql(grid(0), "create table my_table (id int primary key, val varchar)");
+        sql(grid(0), "create table my_table (id int primary key, val varchar)");
 
-        executeSql(grid(0), "alter table my_table add column val2 varchar");
+        sql(grid(0), "alter table my_table add column val2 varchar");
 
-        executeSql(grid(0), "insert into my_table (id, val, val2) values (0, '1', '2')");
+        sql(grid(0), "insert into my_table (id, val, val2) values (0, '1', '2')");
 
-        List<List<?>> res = executeSql(grid(0), "select * from my_table ");
+        List<List<?>> res = sql(grid(0), "select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(3, res.get(0).size());
 
-        executeSql(grid(0), "drop table my_table");
+        sql(grid(0), "drop table my_table");
 
         awaitPartitionMapExchange();
 
-        executeSql("create table my_table (id int primary key, val varchar)");
+        sql("create table my_table (id int primary key, val varchar)");
 
-        executeSql("alter table my_table add column val2 varchar");
+        sql("alter table my_table add column val2 varchar");
 
-        executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+        sql("insert into my_table (id, val, val2) values (0, '1', '2')");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(3, res.get(0).size());
 
         awaitPartitionMapExchange();
 
-        executeSql(grid(0), "alter table my_table drop column val2");
+        sql(grid(0), "alter table my_table drop column val2");
 
         awaitPartitionMapExchange();
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
@@ -701,20 +701,20 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
      */
     @Test
     public void alterTableDropAddColumn() {
-        executeSql("create table my_table (id int primary key, val varchar, val2 varchar)");
+        sql("create table my_table (id int primary key, val varchar, val2 varchar)");
 
-        executeSql("insert into my_table (id, val, val2) values (0, '1', '2')");
+        sql("insert into my_table (id, val, val2) values (0, '1', '2')");
 
-        executeSql("alter table my_table drop column val2");
+        sql("alter table my_table drop column val2");
 
-        List<List<?>> res = executeSql("select * from my_table ");
+        List<List<?>> res = sql("select * from my_table ");
 
         assertEquals(1, res.size());
         assertEquals(2, res.get(0).size());
 
-        executeSql("alter table my_table add column val2 varchar not null");
+        sql("alter table my_table add column val2 varchar not null");
 
-        res = executeSql("select * from my_table ");
+        res = sql("select * from my_table ");
         assertEquals(1, res.size());
         assertEquals(3, res.get(0).size());
         // The command DROP COLUMN does not remove actual data from the cluster, it's a known and documented limitation.
@@ -723,9 +723,9 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
         assertThrows("insert into my_table (id, val, val2) values (1, '2', null)", IgniteSQLException.class,
             "Null value is not allowed");
 
-        executeSql("insert into my_table (id, val, val2) values (1, '2', '3')");
+        sql("insert into my_table (id, val, val2) values (1, '2', '3')");
 
-        assertEquals(2, executeSql("select * from my_table").size());
+        assertEquals(2, sql("select * from my_table").size());
     }
 
     /**
@@ -740,11 +740,11 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
 
         assertTrue(client.cluster().isWalEnabled(cacheName));
 
-        executeSql("alter table \"" + cacheName + "\".Integer nologging");
+        sql("alter table \"" + cacheName + "\".Integer nologging");
 
         assertFalse(client.cluster().isWalEnabled(cacheName));
 
-        executeSql("alter table \"" + cacheName + "\".Integer logging");
+        sql("alter table \"" + cacheName + "\".Integer logging");
 
         assertTrue(client.cluster().isWalEnabled(cacheName));
     }
@@ -760,9 +760,9 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
             "INSERT INTO test(val0, val1, val2) VALUES(1, 'test1', 10);" +
             "ALTER TABLE test DROP COLUMN val2;";
 
-        executeSql(multiLineQuery);
+        sql(multiLineQuery);
 
-        List<List<?>> res = executeSql("SELECT * FROM test order by val0");
+        List<List<?>> res = sql("SELECT * FROM test order by val0");
         assertEquals(2, res.size());
 
         for (int i = 0; i < res.size(); i++) {
@@ -775,17 +775,6 @@ public class TableDdlIntegrationTest extends AbstractDdlIntegrationTest {
     }
 
     /**
-     * Asserts that executeSql throws an exception.
-     *
-     * @param sql Query.
-     * @param cls Exception class.
-     * @param msg Error message.
-     */
-    private void assertThrows(String sql, Class<? extends Exception> cls, String msg) {
-        assertThrowsAnyCause(log, () -> executeSql(sql), cls, msg);
-    }
-
-    /**
      * Matcher to verify that an object of the expected type and matches the given predicat.
      *
      * @param desc Description for this matcher.
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java
index db5a907..e4b55afb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDdlIntegrationTest.java
@@ -75,21 +75,21 @@ public class UserDdlIntegrationTest extends AbstractDdlIntegrationTest {
         for (Ignite ignite : G.allGrids()) {
             IgniteEx igniteEx = (IgniteEx)ignite;
 
-            executeSql(igniteEx, "CREATE USER test WITH PASSWORD 'test'");
+            sql(igniteEx, "CREATE USER test WITH PASSWORD 'test'");
 
             SecurityContext secCtx = authenticate(igniteEx, "TEST", "test");
 
             assertNotNull(secCtx);
             assertEquals("TEST", secCtx.subject().login());
 
-            executeSql(igniteEx, "ALTER USER test WITH PASSWORD 'newpasswd'");
+            sql(igniteEx, "ALTER USER test WITH PASSWORD 'newpasswd'");
 
             secCtx = authenticate(igniteEx, "TEST", "newpasswd");
 
             assertNotNull(secCtx);
             assertEquals("TEST", secCtx.subject().login());
 
-            executeSql(igniteEx, "DROP USER test");
+            sql(igniteEx, "DROP USER test");
 
             GridTestUtils.assertThrowsWithCause(() -> authenticate(igniteEx, "TEST", "newpasswd"),
                 IgniteAccessControlException.class);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 3039a32..c3499c9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.plan.RelOptCluster;
@@ -68,7 +69,6 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper;
 import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7f803a2..1b5779d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -168,12 +168,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-            new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         assertNotNull(plan);
 
@@ -277,12 +276,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-            new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         List<Fragment> fragments = plan.fragments();
         assertEquals(2, fragments.size());
@@ -503,12 +501,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-            new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         List<Fragment> fragments = plan.fragments();
         assertEquals(2, fragments.size());
@@ -721,12 +718,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-                new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         assertNotNull(plan);
 
@@ -805,12 +801,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-                new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         assertEquals(3, plan.fragments().size());
     }
@@ -886,12 +881,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-                new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         assertNotNull(plan);
 
@@ -969,12 +963,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-            new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         assertEquals(3, plan.fragments().size());
     }
@@ -1046,12 +1039,11 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(phys);
 
-        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
-            new Splitter().go(phys)), null);
+        MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(new Splitter().go(phys)), null);
 
         assertNotNull(plan);
 
-        plan.init(Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
+        plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
 
         assertNotNull(plan);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
index 04a97db..055eef6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
@@ -83,4 +83,9 @@ public class GridQueryCancel {
         if (canceled)
             throw new QueryCancelledException();
     }
+
+    /** */
+    public synchronized boolean isCanceled() {
+        return canceled;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
index 175c745..27d1ce7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -38,4 +40,14 @@ public class NoOpQueryEngine extends GridProcessorAdapter implements QueryEngine
         Object... params) throws IgniteSQLException {
         return Collections.emptyList();
     }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends RunningQuery> runningQueries() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RunningQuery runningQuery(UUID id) {
+        return null;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
index 7c168fa..e1206af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
+
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.jetbrains.annotations.Nullable;
@@ -36,4 +39,10 @@ public interface QueryEngine extends GridProcessor {
      */
     List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String schemaName, String qry, Object... params)
         throws IgniteSQLException;
+
+    /** */
+    Collection<? extends RunningQuery> runningQueries();
+
+    /** */
+    RunningQuery runningQuery(UUID id);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
similarity index 72%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
index 311ef65..a6cd8cb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
@@ -15,17 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query;
 
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
+/** */
+public enum QueryState {
+    /** */
+    INITED,
 
-/**
- *
- */
-public interface SchemaHolder extends Service {
-    /**
-     * @return Schema.
-     */
-    SchemaPlus schema();
+    /** */
+    PLANNING,
+
+    /** */
+    MAPPING,
+
+    /** */
+    EXECUTING,
+
+    /** */
+    CLOSING,
+
+    /** */
+    CLOSED
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
similarity index 73%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
index 311ef65..447e925 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
@@ -15,17 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.schema;
+package org.apache.ignite.internal.processors.query;
 
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import java.util.UUID;
 
 /**
  *
  */
-public interface SchemaHolder extends Service {
-    /**
-     * @return Schema.
-     */
-    SchemaPlus schema();
+public interface RunningQuery {
+    /** */
+    UUID id();
+
+    /** */
+    QueryState state();
+
+    /** */
+    void cancel();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
index f82f022..51d77bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlCommandProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql;
 
 import java.util.LinkedHashMap;
 import java.util.List;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.IgniteLogger;
@@ -40,7 +41,6 @@ import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQuerySchemaManager;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
 import org.apache.ignite.internal.sql.command.SqlAlterUserCommand;
@@ -88,17 +88,14 @@ public class SqlCommandProcessor {
     /**
      * Execute command.
      *
-     * @param sql SQL.
      * @param cmdNative Native command.
-     * @param cliCtx Client context.
      * @return Result.
      */
-    @Nullable public FieldsQueryCursor<List<?>> runCommand(String sql, SqlCommand cmdNative,
-        @Nullable SqlClientContext cliCtx) {
+    @Nullable public FieldsQueryCursor<List<?>> runCommand(SqlCommand cmdNative) {
         assert cmdNative != null;
 
         if (isDdl(cmdNative))
-            runCommandNativeDdl(sql, cmdNative);
+            runCommandNativeDdl(cmdNative);
         else if (cmdNative instanceof SqlKillComputeTaskCommand)
             processKillComputeTaskCommand((SqlKillComputeTaskCommand) cmdNative);
         else if (cmdNative instanceof SqlKillTransactionCommand)
@@ -192,10 +189,9 @@ public class SqlCommandProcessor {
     /**
      * Run DDL statement.
      *
-     * @param sql Original SQL.
      * @param cmd Command.
      */
-    private void runCommandNativeDdl(String sql, SqlCommand cmd) {
+    private void runCommandNativeDdl(SqlCommand cmd) {
         IgniteInternalFuture<?> fut = null;
 
         try {
@@ -306,7 +302,7 @@ public class SqlCommandProcessor {
                 ctx.security().dropUser(dropCmd.userName());
             }
             else
-                throw new IgniteSQLException("Unsupported DDL operation: " + sql,
+                throw new IgniteSQLException("Unsupported DDL operation: " + cmd,
                     IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
             if (fut != null)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index f727505..65fd9c7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -442,7 +442,7 @@ public class CommandProcessor extends SqlCommandProcessor {
     public FieldsQueryCursor<List<?>> runNativeCommand(String sql, SqlCommand cmdNative,
         QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
         if (super.isCommandSupported(cmdNative))
-            return runCommand(sql, cmdNative, cliCtx);
+            return runCommand(cmdNative);
 
         if (cmdNative instanceof SqlBulkLoadCommand)
             return processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId);