You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/02/17 10:21:22 UTC

[ignite] branch ignite-12248 updated: IGNITE-12602: Calcite integration. JDBC Thin driver support. This closes #7377

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new b9431b7  IGNITE-12602: Calcite integration. JDBC Thin driver support. This closes #7377
b9431b7 is described below

commit b9431b7880d97256cc51c35e6e16ba7cc6aee99b
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Mon Feb 17 13:21:05 2020 +0300

    IGNITE-12602: Calcite integration. JDBC Thin driver support. This closes #7377
---
 modules/calcite/pom.xml                            |   6 +
 .../query/calcite/CalciteQueryProcessor.java       |   8 +-
 .../calcite/exec/ClosableIteratorsHolder.java      | 164 +++++++
 .../query/calcite/exec/ConsumerNode.java           |  10 +-
 .../query/calcite/exec/ExchangeService.java        |   6 +-
 .../query/calcite/exec/ExchangeServiceImpl.java    |  27 +-
 .../query/calcite/exec/ExecutionContext.java       |   9 +-
 .../query/calcite/exec/ExecutionService.java       |   2 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   | 538 ++++++++++++++-------
 .../processors/query/calcite/exec/Implementor.java |   4 +-
 .../query/calcite/exec/MailboxRegistry.java        |  11 +-
 .../query/calcite/exec/MailboxRegistryImpl.java    |  16 +-
 .../query/calcite/exec/QueryCancelGroup.java       |  92 ++++
 .../processors/query/calcite/exec/ScanNode.java    |   4 +-
 .../query/calcite/message/MessageService.java      |  12 +-
 .../query/calcite/message/MessageServiceImpl.java  |  76 ++-
 .../query/calcite/metadata/FragmentInfo.java       |   2 +-
 .../calcite/metadata/IgniteMdDistribution.java     |   8 +-
 .../calcite/metadata/IgniteMdFragmentInfo.java     |  38 +-
 .../query/calcite/metadata/IgniteMetadata.java     |   4 +-
 .../metadata/OptimisticPlanningException.java      |   2 +-
 .../query/calcite/metadata/RelMetadataQueryEx.java |   2 +-
 .../calcite/prepare/CalciteQueryFieldMetadata.java | 116 +++++
 .../calcite/{splitter => prepare}/Cloner.java      |   6 +-
 .../query/calcite/{splitter => prepare}/Edge.java  |   2 +-
 .../calcite/{splitter => prepare}/Fragment.java    |   5 +-
 .../query/calcite/prepare/IgnitePlanner.java       | 215 ++++----
 .../query/calcite/prepare/IgnitePrograms.java      |  92 ++++
 .../query/calcite/prepare/MultiStepPlan.java       |  49 ++
 .../MultiStepPlanImpl.java}                        |  45 +-
 .../query/calcite/prepare/PlannerPhase.java        |  29 +-
 .../query/calcite/prepare/PlannerType.java         |  26 -
 .../query/calcite/prepare/PlanningContext.java     |  50 +-
 .../query/calcite/prepare/QueryPlan.java           |  37 ++
 .../query/calcite/prepare/QueryPlanCache.java      |   4 +-
 .../query/calcite/prepare/QueryPlanCacheImpl.java  |  25 +-
 .../query/calcite/prepare/QueryPlanFactory.java    |   4 +-
 .../calcite/{splitter => prepare}/RelSource.java   |   3 +-
 .../{splitter => prepare}/RelSourceImpl.java       |   2 +-
 .../calcite/{splitter => prepare}/RelTarget.java   |   2 +-
 .../{splitter => prepare}/RelTargetImpl.java       |   2 +-
 .../query/calcite/prepare/RowMetadata.java         |  59 +++
 .../calcite/{splitter => prepare}/Splitter.java    |  17 +-
 .../query/calcite/prepare/ValidationResult.java    |  68 +++
 .../query/calcite/rel/IgniteConvention.java        |   8 -
 .../query/calcite/rel/IgniteReceiver.java          |   2 +-
 .../processors/query/calcite/rel/IgniteSender.java |   2 +-
 .../query/calcite/schema/SchemaHolderImpl.java     |   1 +
 .../query/calcite/schema/TableDescriptorImpl.java  | 127 ++++-
 .../calcite/serialize/relation/ReceiverNode.java   |   4 +-
 .../calcite/serialize/relation/SenderNode.java     |   2 +-
 .../processors/query/calcite/util/Commons.java     |  21 +-
 .../calcite/util/ConvertingClosableIterator.java   |  59 +++
 .../query/calcite/util/IgniteMethod.java           |   4 +-
 .../query/calcite/util/ListFieldsQueryCursor.java  |  75 +--
 .../processors/query/calcite/util/TableScan.java   |   2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  14 +-
 .../processors/query/calcite/PlannerTest.java      |  70 +--
 .../query/calcite/exec/AbstractExecutionTest.java  |   8 +-
 .../calcite/exec/ClosableIteratorsHolderTest.java  | 101 ++++
 .../processors/query/calcite/exec/OutboxTest.java  |   6 +-
 .../query/calcite/jdbc/JdbcQueryTest.java          |  85 ++++
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   4 +-
 .../internal/jdbc/thin/ConnectionProperties.java   |  12 +
 .../jdbc/thin/ConnectionPropertiesImpl.java        |  17 +-
 .../ignite/internal/jdbc/thin/JdbcThinTcpIo.java   |   8 +-
 .../internal/processors/cache/QueryCursorImpl.java |   2 +-
 .../processors/cache/query/QueryCursorEx.java      |   7 +
 .../odbc/jdbc/JdbcConnectionContext.java           |  11 +-
 .../processors/odbc/jdbc/JdbcQueryCursor.java      |  10 +-
 .../processors/odbc/jdbc/JdbcRequestHandler.java   |  49 +-
 .../cache/query/PlatformContinuousQueryImpl.java   |   4 +
 .../internal/processors/query/QueryContext.java    |   2 +-
 73 files changed, 1967 insertions(+), 649 deletions(-)

diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index b546eca..3b30977 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -98,6 +98,12 @@
             <version>${spring.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-clients</artifactId>
+            <version>2.9.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index fc87608..27a092a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite;
 
-import java.util.Collections;
 import java.util.List;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCostImpl;
 import org.apache.calcite.sql.fun.SqlLibrary;
 import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -66,6 +66,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
                 // Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower
                 // case when they are read, and whether identifiers are matched case-sensitively.
                 .setLex(Lex.ORACLE)
+//                .setParserFactory(SqlDdlParserImpl.FACTORY) // Enables DDL support
                 .build())
             // Dialects support.
             .operatorTable(SqlLibraryOperatorTableFactory.INSTANCE
@@ -75,7 +76,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
             // Context provides a way to store data within the planner session that can be accessed in planner rules.
             .context(Contexts.empty())
             // Custom cost factory to use during optimization
-            .costFactory(null)
+            .costFactory(RelOptCostImpl.FACTORY)
             .typeSystem(IgniteTypeSystem.INSTANCE)
             .build();
 
@@ -251,8 +252,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
         String query, Object... params) throws IgniteSQLException {
         
-        // TODO multiline query.
-        return Collections.singletonList(executionService.executeQuery(qryCtx, schemaName, query, params));
+        return executionService.executeQuery(qryCtx, schemaName, query, params);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
new file mode 100644
index 0000000..d3910c9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClosableIteratorsHolder {
+    /** */
+    private final ReferenceQueue refQueue;
+
+    /** */
+    private final Map<Reference, Object> refMap;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private volatile boolean stopped;
+
+    /** */
+    private Thread cleanWorker;
+
+    /** */
+    public ClosableIteratorsHolder(IgniteLogger log) {
+        this.log = log;
+
+        refQueue = new ReferenceQueue<>();
+        refMap = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * @param src Closeable iterator.
+     * @return Weak closable iterator wrapper.
+     */
+    public <T> Iterator<T> iterator(final Iterator<T> src) {
+        cleanUp(false);
+
+        return new DelegatingIterator<>(src);
+    }
+
+    /** */
+    public void init() {
+        cleanWorker = new Thread(() -> cleanUp(true));
+        cleanWorker.setDaemon(true);
+        cleanWorker.start();
+    }
+
+    /** */
+    public void tearDown() {
+        stopped = true;
+        refMap.clear();
+        U.interrupt(cleanWorker);
+    }
+
+    /** */
+    private void cleanUp(boolean blocking) {
+        for (Reference<?> ref = nextRef(blocking); !stopped && ref != null; ref = nextRef(blocking))
+            Commons.close(refMap.remove(ref), log);
+    }
+
+    /** */
+    private Reference nextRef(boolean blocking) {
+        try {
+            return !blocking ? refQueue.poll() : refQueue.remove();
+        }
+        catch (InterruptedException ignored) {
+            return null;
+        }
+    }
+
+    /** */
+    private AutoCloseable closeable(Object referent, Object resource) {
+        if (!(resource instanceof AutoCloseable))
+            return null;
+
+        return new CloseableReference(referent, resource);
+    }
+
+    /** */
+    private final class DelegatingIterator<T> implements Iterator<T>, AutoCloseable {
+        /** */
+        private final Iterator<T> delegate;
+
+        /** */
+        private final AutoCloseable closeable;
+
+        /** */
+        private DelegatingIterator(Iterator<T> delegate) {
+            closeable = closeable(this, this.delegate = delegate);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return delegate.hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T next() {
+            return delegate.next();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            delegate.remove();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void forEachRemaining(Consumer<? super T> action) {
+            delegate.forEachRemaining(action);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            Commons.close(closeable);
+        }
+    }
+
+    /** */
+    private final class CloseableReference extends WeakReference implements AutoCloseable {
+        /** */
+        private CloseableReference(Object referent, Object resource) {
+            super(referent, refQueue);
+
+            refMap.put(this, resource);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            try {
+                Commons.close(refMap.remove(this));
+            }
+            finally {
+                clear();
+            }
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
index a686866..19a474c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -134,8 +134,6 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
 
     /** {@inheritDoc} */
     @Override public void cancel() {
-        context().setCancelled();
-
         if (state != State.RUNNING)
             return;
 
@@ -144,6 +142,7 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
             if (state != State.RUNNING)
                 return;
 
+            context().setCancelled();
             state = State.CANCELLED;
             buff.clear();
             cond.signalAll();
@@ -155,8 +154,11 @@ public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<O
         context().execute(input()::cancel);
         onClose.accept(this);
     }
-    
-    public boolean canceled() {
+
+    /**
+     * @return Cancelled flag.
+     */
+    boolean canceled() {
         return state == State.CANCELLED;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 24b5383..4f9a05e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -42,7 +42,7 @@ public interface ExchangeService extends Service {
      * @param batchId Batch ID.
      * @param rows Data rows.
      */
-    void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows);
+    void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows);
 
     /**
      * Acknowledges a batch with given ID is processed.
@@ -54,7 +54,7 @@ public interface ExchangeService extends Service {
      * @param exchangeId Exchange ID.
      * @param batchId Batch ID.
      */
-    void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
+    void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
 
     /**
      * Sends cancel request.
@@ -66,5 +66,5 @@ public interface ExchangeService extends Service {
      * @param exchangeId Exchange ID.
      * @param batchId Batch ID.
      */
-    void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
+    void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 7559f06..b94940b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage;
@@ -95,18 +96,32 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
     }
 
     /** {@inheritDoc} */
-    @Override public void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
-        messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
+    @Override public void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
+        try {
+            messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
+        }
+        catch (IgniteCheckedException e) {
+            caller.cancel();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
-        messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
+    @Override public void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+        try {
+            messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
+        }
+        catch (IgniteCheckedException e) {
+            caller.cancel();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
-        messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
+    @Override public void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+        try {
+            messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
+        }
+        catch (IgniteCheckedException ignored) {
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 0b4177a..e01f78b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -99,7 +99,7 @@ public class ExecutionContext implements DataContext {
      * @return Keep binary flag.
      */
     public boolean keepBinary() {
-        return false; // TODO
+        return true; // TODO
     }
 
     /**
@@ -116,6 +116,13 @@ public class ExecutionContext implements DataContext {
         return cancelled;
     }
 
+    /**
+     * @return Originating node ID.
+     */
+    public UUID originatingNodeId() {
+        return parent().originatingNodeId();
+    }
+
     /** {@inheritDoc} */
     @Override public SchemaPlus getRootSchema() {
         return ctx.schema();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index 16afe41..59ba0a8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -37,7 +37,7 @@ public interface ExecutionService extends Service {
      * @param params Query parameters.
      * @return Query cursor.
      */
-    FieldsQueryCursor<List<?>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
+    List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
 
     /**
      * Cancels a running query.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index e4e1fd5..8116883 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -27,19 +29,25 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -49,6 +57,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryCancellable;
 import org.apache.ignite.internal.processors.query.QueryContext;
@@ -58,15 +68,22 @@ import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.message.QueryCancelRequest;
 import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
 import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
 import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
+import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlanImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -75,9 +92,6 @@ import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.SenderNode;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -133,6 +147,9 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
     private ExchangeService exchangeService;
 
     /** */
+    private ClosableIteratorsHolder iteratorsHolder;
+
+    /** */
     private final Map<UUID, QueryInfo> running;
 
     /**
@@ -313,86 +330,36 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
         return exchangeManager;
     }
 
-    /** {@inheritDoc} */
-    @Override public FieldsQueryCursor<List<?>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
-        UUID queryId = UUID.randomUUID();
-
-        PlanningContext pctx = createContext(ctx, schema, query, params);
-
-        QueryPlan plan = prepare(pctx);
-
-        // Local execution
-        Fragment local = F.first(plan.fragments());
-
-        if (U.assertionsEnabled()) {
-            assert local != null;
-
-            NodesMapping mapping = local.mapping();
-
-            assert mapping != null;
+    /**
+     * @param iteratorsHolder Iterators holder.
+     */
+    public void iteratorsHolder(ClosableIteratorsHolder iteratorsHolder) {
+        this.iteratorsHolder = iteratorsHolder;
+    }
 
-            List<UUID> nodes = mapping.nodes();
+    /**
+     * @return Iterators holder.
+     */
+    public ClosableIteratorsHolder iteratorsHolder() {
+        return iteratorsHolder;
+    }
 
-            assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(pctx.localNodeId());
+    /** {@inheritDoc} */
+    @Override public List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
+        PlanningContext pctx = createContext(ctx, schema, query, params);
+        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER));
+        try (IgnitePlanner ignored = pctx.planner()) {
+            return Commons.transform(prepare(pctx), p -> executeSingle(UUID.randomUUID(), pctx, p));
         }
-
-        ExecutionContext ectx = new ExecutionContext(
-            taskExecutor(), pctx, queryId, local.fragmentId(), local.mapping().partitions(pctx.localNodeId()), Commons.parametersMap(params)
-        );
-
-        Node<Object[]> node = new Implementor(partitionService(), mailboxRegistry(), exchangeService(), failureProcessor(), ectx, log).go(local.root());
-
-        assert !(node instanceof SenderNode);
-
-        QueryInfo info = new QueryInfo(ectx, local.root().getRowType(), new ConsumerNode(ectx, node, this::onConsumerClose));
-
-        if (plan.fragments().size() == 1)
-            running.put(queryId, info);
-        else {
-            // remote execution
-            RelOp<IgniteRel, RelGraph> converter = new RelToGraphConverter();
-            List<Pair<UUID, QueryStartRequest>> requests = new ArrayList<>();
-
-            for (Fragment remote : plan.fragments().subList(1, plan.fragments().size())) {
-                long id = remote.fragmentId();
-                NodesMapping mapping = remote.mapping();
-                RelGraph graph = converter.go(remote.root());
-
-                for (UUID nodeId : mapping.nodes()) {
-                    info.addFragment(nodeId, id);
-
-                    QueryStartRequest req = new QueryStartRequest(
-                        queryId,
-                        id,
-                        schema,
-                        graph,
-                        pctx.topologyVersion(),
-                        mapping.partitions(nodeId),
-                        params);
-
-                    requests.add(Pair.of(nodeId, req));
-                }
-            }
-
-            running.put(queryId, info);
-
-            // start remote execution
-            for (Pair<UUID, QueryStartRequest> pair : requests)
-                messageService().send(pair.left, pair.right);
+        finally {
+            RelMetadataQuery.THREAD_PROVIDERS.remove();
         }
-
-        // start local execution
-        info.consumer.request();
-
-        info.awaitAllReplies();
-
-        // TODO weak map to stop query on cursor collected by GC.
-        return new ListFieldsQueryCursor<>(info.type(), info.<Object[]>iterator(), Arrays::asList);
     }
 
     /** {@inheritDoc} */
     @Override public void cancelQuery(UUID queryId) {
-        mailboxRegistry().outboxes(queryId).forEach(Outbox::cancel);
+        mailboxRegistry().outboxes(queryId).forEach(this::executeCancel);
+        mailboxRegistry().inboxes(queryId).forEach(this::executeCancel);
 
         QueryInfo info = running.get(queryId);
 
@@ -405,6 +372,7 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
         localNodeId(ctx.localNodeId());
         exchangeManager(ctx.cache().context().exchange());
         eventManager(ctx.event());
+        iteratorsHolder(new ClosableIteratorsHolder(log));
 
         CalciteQueryProcessor proc = Objects.requireNonNull(Commons.lookupComponent(ctx, CalciteQueryProcessor.class));
 
@@ -428,6 +396,8 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
         messageService().register((n,m) -> onMessage(n, (QueryCancelRequest) m), MessageType.QUERY_CANCEL_REQUEST);
 
         eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+        iteratorsHolder().init();
     }
 
     /** {@inheritDoc} */
@@ -435,6 +405,8 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
         eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
         running.clear();
+
+        iteratorsHolder().tearDown();
     }
 
     /** */
@@ -443,65 +415,6 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
     }
 
     /** */
-    private QueryPlan prepare(PlanningContext ctx) {
-        CacheKey cacheKey = new CacheKey(ctx.schema().getName(), ctx.query());
-
-        QueryPlan plan = queryPlanCache().queryPlan(ctx, cacheKey, this::prepare0);
-
-        plan.init(mappingService(), ctx);
-
-        return plan;
-    }
-
-    /** */
-    private QueryPlan prepare0(PlanningContext ctx) {
-        IgnitePlanner planner = ctx.planner();
-
-        try {
-            String query = ctx.query();
-
-            assert query != null;
-
-            // Parse
-            SqlNode sqlNode = planner.parse(query);
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            RelNode rel = planner.convert(sqlNode);
-
-            // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
-
-            RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            IgniteRel igniteRel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
-
-            // Split query plan to multi-step one.
-            return new Splitter().go(igniteRel);
-        }
-        catch (SqlParseException e) {
-            IgniteSQLException ex = new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e);
-            Commons.close(planner, ex);
-            throw ex;
-        }
-        catch (ValidationException e) {
-            IgniteSQLException ex = new IgniteSQLException("Failed to validate query.", IgniteQueryErrorCode.UNKNOWN, e);
-            Commons.close(planner, ex);
-            throw ex;
-        }
-        catch (Exception e) {
-            IgniteSQLException ex = new IgniteSQLException("Failed to plan query.", IgniteQueryErrorCode.UNKNOWN, e);
-            Commons.close(planner, ex);
-            throw ex;
-        }
-    }
-
-    /** */
     private PlanningContext createContext(@Nullable QueryContext qryCtx, @Nullable String schemaName, String query, Object[] params) {
         RelTraitDef<?>[] traitDefs = {
             ConventionTraitDef.INSTANCE
@@ -511,7 +424,7 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
 
         return PlanningContext.builder()
             .localNodeId(localNodeId())
-            .parentContext(Commons.convert(qryCtx)) // TODO Connection config on the basis of query context
+            .parentContext(Commons.convert(qryCtx))
             .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
                 .defaultSchema(schemaName != null
                     ? schemaHolder().schema().getSubSchema(schemaName)
@@ -521,6 +434,7 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
             .query(query)
             .parameters(params)
             .topologyVersion(topologyVersion())
+            .cancelGroup(cancelGroup(qryCtx))
             .logger(log)
             .build();
     }
@@ -550,17 +464,239 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
     }
 
     /** */
-    protected void onMessage(UUID nodeId, QueryStartRequest msg) {
+    private List<QueryPlan> prepare(PlanningContext ctx) {
+        return queryPlanCache().queryPlan(ctx, new CacheKey(ctx.schemaName(), ctx.query()), this::prepare0);
+    }
+
+    /** */
+    private List<QueryPlan> prepare0(PlanningContext ctx) {
+        try {
+            String query = ctx.query();
+
+            assert query != null;
+
+            // Parse query.
+            SqlNode sqlNode = ctx.planner().parse(query);
+
+            if (single(sqlNode))
+                return Collections.singletonList(prepareSingle(sqlNode, ctx));
+
+            List<SqlNode> nodes = ((SqlNodeList) sqlNode).getList();
+            List<QueryPlan> res = new ArrayList<>(nodes.size());
+
+            for (SqlNode node : nodes)
+                res.add(prepareSingle(node, ctx));
+
+            return res;
+        }
+        catch (IgniteSQLException e) {
+            throw e;
+        }
+        catch (SqlParseException e) {
+            throw new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e);
+        }
+        catch (ValidationException e) {
+            throw new IgniteSQLException("Failed to validate query.", IgniteQueryErrorCode.PARSING, e);
+        }
+        catch (Exception e) {
+            throw new IgniteSQLException("Failed to plan query.", IgniteQueryErrorCode.UNKNOWN, e);
+        }
+    }
+
+    /** */
+    private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+        assert single(sqlNode);
+
+        ctx.planner().reset();
+
+        if (SqlKind.QUERY.contains(sqlNode.getKind()))
+            return prepareQuery(sqlNode, ctx);
+
+        throw new IgniteSQLException("Unsupported operation [querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+    }
+
+    /** */
+    private QueryPlan prepareQuery(SqlNode sqlNode, PlanningContext ctx) throws ValidationException {
+        IgnitePlanner planner = ctx.planner();
+
+        // Validate
+        ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
+
+        sqlNode = validated.sqlNode();
+
+        // Convert to Relational operators graph
+        RelNode rel = planner.convert(sqlNode);
+
+        // Transformation chain
+        rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+        RelTraitSet desired = rel.getCluster().traitSet()
+            .replace(IgniteConvention.INSTANCE)
+            .replace(IgniteDistributions.single())
+            .simplify();
+
+        IgniteRel igniteRel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+        // Split query plan to query fragments.
+        List<Fragment> fragments = new Splitter().go(igniteRel);
+
+        return new MultiStepPlanImpl(fragments, fieldsMetadata(validated, ctx));
+    }
+
+    /** */
+    private FieldsQueryCursor<List<?>> executeSingle(UUID queryId, PlanningContext pctx, QueryPlan plan) {
+        if (plan.type() == QueryPlan.Type.QUERY)
+            return executeQuery(queryId, (MultiStepPlan) plan, pctx);
+
+        throw new AssertionError("Unexpected plan type: " + plan);
+    }
+
+    /** */
+    private FieldsQueryCursor<List<?>> executeQuery(UUID queryId, MultiStepPlan plan, PlanningContext pctx) {
+        plan.init(mappingService(), pctx);
+
+        List<Fragment> fragments = plan.fragments();
+
+        // Local execution
+        Fragment local = F.first(fragments);
+
+        if (U.assertionsEnabled()) {
+            assert local != null;
+
+            NodesMapping mapping = local.mapping();
+
+            assert mapping != null;
+
+            List<UUID> nodes = mapping.nodes();
+
+            assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(pctx.localNodeId());
+        }
+
+        ExecutionContext ectx = new ExecutionContext(taskExecutor(), pctx, queryId, local.fragmentId(),
+            local.mapping().partitions(pctx.localNodeId()), Commons.parametersMap(pctx.parameters()));
+
+        Node<Object[]> node = new Implementor(partitionService(), mailboxRegistry(), exchangeService(), failureProcessor(), ectx, log).go(local.root());
+
+        assert !(node instanceof SenderNode);
+
+        QueryInfo info = new QueryInfo(ectx, fragments, node);
+
+        // register query
+        register(info);
+
+        // start local execution
+        info.consumer.request();
+
+        // start remote execution
+        if (fragments.size() > 1) {
+            RelOp<IgniteRel, RelGraph> converter = new RelToGraphConverter();
+
+            for (int i = 1; i < fragments.size(); i++) {
+                Fragment fragment = fragments.get(i);
+
+                boolean error = false;
+
+                for (UUID nodeId : fragment.mapping().nodes()) {
+                    if (error)
+                        info.onResponse(nodeId, fragment.fragmentId(), new QueryCancelledException());
+                    else {
+                        try {
+                            QueryStartRequest req = new QueryStartRequest(
+                                queryId,
+                                fragment.fragmentId(),
+                                pctx.schemaName(),
+                                converter.go(fragment.root()),
+                                pctx.topologyVersion(),
+                                fragment.mapping().partitions(nodeId),
+                                pctx.parameters());
+
+                            messageService().send(nodeId, req);
+                        }
+                        catch (Exception e) {
+                            info.onResponse(nodeId, fragment.fragmentId(), e);
+                            error = true;
+                        }
+                    }
+                }
+
+                if (error) {
+                    info.awaitAllReplies();
+
+                    throw new AssertionError(); // Previous call must throw an exception
+                }
+            }
+        }
+
+        return new ListFieldsQueryCursor<>(info.iterator(), Arrays::asList, plan.fieldsMetadata());
+    }
+
+    /** */
+    private void register(QueryInfo info) {
+        UUID queryId = info.ctx.queryId();
+        PlanningContext pctx = info.ctx.parent();
+
+        running.put(queryId, info);
+
+        if (pctx.cancelGroup() == null || pctx.cancelGroup().add(info))
+            return;
+
+        running.remove(queryId);
+
+        throw new IgniteSQLException(QueryCancelledException.ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED);
+    }
+
+    /** */
+    private QueryCancelGroup cancelGroup(@Nullable QueryContext qryCtx) {
+        GridQueryCancel cancel;
+
+        if (qryCtx == null || (cancel = qryCtx.unwrap(GridQueryCancel.class)) == null)
+            return null;
+
+        return new QueryCancelGroup(cancel, failureProcessor());
+    }
+
+    /** */
+    private List<GridQueryFieldMetadata> fieldsMetadata(ValidationResult validationResult, PlanningContext ctx) {
+        List<RelDataTypeField> fields = validationResult.dataType().getFieldList();
+        List<List<String>> origins = validationResult.origins();
+
+        assert fields.size() == origins.size();
+
+        ImmutableList.Builder<GridQueryFieldMetadata> b = ImmutableList.builder();
+
+        for (int i = 0; i < fields.size(); i++) {
+            RelDataTypeField field = fields.get(i);
+            List<String> origin = origins.get(i);
+
+            b.add(new CalciteQueryFieldMetadata(
+                F.isEmpty(origin) ? null : origin.get(0),
+                F.isEmpty(origin) ? null : origin.get(1),
+                field.getName(),
+                String.valueOf(ctx.typeFactory().getJavaClass(field.getType())),
+                field.getType().getPrecision(),
+                field.getType().getScale()
+            ));
+        }
+
+        return b.build();
+    }
+
+    /** */
+    private boolean single(SqlNode sqlNode) {
+        return !(sqlNode instanceof SqlNodeList);
+    }
+
+    /** */
+    private void onMessage(UUID nodeId, QueryStartRequest msg) {
         assert nodeId != null && msg != null;
 
         PlanningContext ctx = createContext(msg.schema(), nodeId, msg.topologyVersion());
-
+        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER));
         try (IgnitePlanner planner = ctx.planner()) {
             IgniteRel root = planner.convert(msg.plan());
 
             assert root instanceof IgniteSender : root;
 
-            // TODO do we need a local optimisation phase here?
             ExecutionContext execCtx = new ExecutionContext(
                 taskExecutor(),
                 ctx,
@@ -574,12 +710,27 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
 
             assert node instanceof Outbox : node;
 
-            node.request();
+            node.context().execute(node::request);
 
             messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId()));
         }
         catch (Exception ex) {
-            messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
+            cancelQuery(msg.queryId());
+
+            if (ex instanceof ClusterTopologyCheckedException)
+                return;
+
+            try {
+                messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
+            }
+            catch (IgniteCheckedException e) {
+                e.addSuppressed(ex);
+
+                U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
+            }
+        }
+        finally {
+            RelMetadataQuery.THREAD_PROVIDERS.remove();
         }
     }
 
@@ -611,10 +762,28 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
     /** */
     private void onNodeLeft(UUID nodeId) {
         running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId));
+
+        final Predicate<Node<?>> p = new OriginatingFilter(nodeId);
+
+        mailboxRegistry().outboxes(null).stream()
+            .filter(p).forEach(this::executeCancel);
+
+        mailboxRegistry().inboxes(null).stream()
+            .filter(p).forEach(this::executeCancel);
     }
 
     /** */
-    private static class RemoteFragmentKey {
+    private void executeCancel(Node<?> node) {
+        node.context().execute(node::cancel);
+    }
+
+    /** */
+    private enum QueryState {
+        RUNNING, CANCELLING, CANCELLED
+    }
+
+    /** */
+    private static final class RemoteFragmentKey {
         /** */
         private final UUID nodeId;
 
@@ -650,19 +819,11 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
     }
 
     /** */
-    private enum QueryState {
-        RUNNING, CANCELLING, CANCELLED
-    }
-
-    /** */
     private final class QueryInfo implements QueryCancellable {
         /** */
         private final ExecutionContext ctx;
 
         /** */
-        private final RelDataType type;
-
-        /** */
         private final ConsumerNode consumer;
 
         /** remote nodes */
@@ -678,41 +839,36 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
         private Throwable error;
 
         /** */
-        private QueryInfo(ExecutionContext ctx, RelDataType type, ConsumerNode consumer) {
+        private QueryInfo(ExecutionContext ctx, List<Fragment> fragments, Node<Object[]> root) {
             this.ctx = ctx;
-            this.type = type;
-            this.consumer = consumer;
+
+            consumer = new ConsumerNode(ctx, root, ExecutionServiceImpl.this::onConsumerClose);
 
             remotes = new HashSet<>();
             waiting = new HashSet<>();
 
-            state = QueryState.RUNNING;
-        }
+            for (int i = 1; i < fragments.size(); i++) {
+                Fragment fragment = fragments.get(i);
+                long id = fragment.fragmentId();
+                List<UUID> nodes = fragment.mapping().nodes();
 
-        /** {@inheritDoc} */
-        @Override public void doCancel() {
-            cancel();
-        }
+                remotes.addAll(nodes);
 
-        /** */
-        private ConsumerNode localNode() {
-            return consumer;
-        }
+                for (UUID node : nodes)
+                    waiting.add(new RemoteFragmentKey(node, id));
+            }
 
-        /** */
-        private RelDataType type() {
-            return type;
+            state = QueryState.RUNNING;
         }
 
         /** */
-        private <T> Iterator<T> iterator() {
-            return (Iterator<T>) consumer;
+        public Iterator<Object[]> iterator() {
+            return iteratorsHolder().iterator(consumer);
         }
 
-        /** */
-        private void addFragment(UUID nodeId, long fragmentId) {
-            remotes.add(nodeId);
-            waiting.add(new RemoteFragmentKey(nodeId, fragmentId));
+        /** {@inheritDoc} */
+        @Override public void doCancel() {
+            cancel();
         }
 
         /** */
@@ -759,8 +915,21 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
             if (cancelLocal)
                 consumer.cancel();
 
-            if (cancelRemote)
-                messageService().send(remotes, new QueryCancelRequest(ctx.queryId()));
+            if (cancelRemote) {
+                QueryCancelRequest msg = new QueryCancelRequest(ctx.queryId());
+
+                for (UUID remote : remotes) {
+                    try {
+                        messageService().send(remote, msg);
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        U.warn(log, e.getMessage(), e);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertException(e);
+                    }
+                }
+            }
 
             if (state0 == QueryState.CANCELLED)
                 running.remove(ctx.queryId());
@@ -822,4 +991,21 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
                 cancel();
         }
     }
+
+    /** */
+    private static class OriginatingFilter implements Predicate<Node<?>> {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private OriginatingFilter(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean test(Node<?> node) {
+            // Uninitialized inbox doesn't know originating node ID.
+            return Objects.equals(node.context().originatingNodeId(), nodeId);
+        }
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
index 4199f7f..de2bfb8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
@@ -25,6 +25,8 @@ import org.apache.calcite.schema.ScannableTable;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
@@ -35,8 +37,6 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
 import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index 7dd462f..b5b80f6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -17,9 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -77,16 +78,16 @@ public interface MailboxRegistry extends Service {
     /**
      * Returns all registered inboxes for provided query ID.
      *
-     * @param queryId Query ID.
+     * @param queryId Query ID. {@code null} means return all registered inboxes.
      * @return Registered inboxes.
      */
-    List<Inbox<?>> inboxes(UUID queryId);
+    Collection<Inbox<?>> inboxes(@Nullable UUID queryId);
 
     /**
      * Returns all registered outboxes for provided query ID.
      *
-     * @param queryId Query ID.
+     * @param queryId Query ID. {@code null} means return all registered outboxes.
      * @return Registered outboxes.
      */
-    List<Outbox<?>> outboxes(UUID queryId);
+    Collection<Outbox<?>> outboxes(@Nullable UUID queryId);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index cc6e0be..1846e3b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -80,7 +81,10 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
     }
 
     /** {@inheritDoc} */
-    @Override public List<Inbox<?>> inboxes(UUID queryId) {
+    @Override public Collection<Inbox<?>> inboxes(@Nullable UUID queryId) {
+        if (queryId == null)
+            return remotes.values();
+
         return remotes.entrySet().stream()
             .filter(e -> e.getKey().queryId.equals(queryId))
             .map(Map.Entry::getValue)
@@ -88,11 +92,15 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
     }
 
     /** {@inheritDoc} */
-    @Override public List<Outbox<?>> outboxes(UUID queryId) {
+    @Override public Collection<Outbox<?>> outboxes(@Nullable UUID queryId) {
+        if (queryId == null)
+            return locals.values();
+
         return locals.entrySet().stream()
             .filter(e -> e.getKey().queryId.equals(queryId))
             .map(Map.Entry::getValue)
-            .collect(Collectors.toList());    }
+            .collect(Collectors.toList());
+    }
 
     /** */
     private static class MailboxKey {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryCancelGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryCancelGroup.java
new file mode 100644
index 0000000..6e2573a
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryCancelGroup.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryCancellable;
+
+/** */
+public final class QueryCancelGroup implements QueryCancellable {
+    /** */
+    private final FailureProcessor failureProcessor;
+
+    /** */
+    private final Set<QueryCancellable> queries;
+
+    /** */
+    private boolean cancelled;
+
+    /** */
+    public QueryCancelGroup(GridQueryCancel cancel, FailureProcessor failureProcessor) {
+        this.failureProcessor = failureProcessor;
+
+        queries = new HashSet<>();
+
+        register(cancel);
+    }
+
+    /**
+     * Adds a cancellable to the group.
+     *
+     * @param query Query cancellable object.
+     * @return {@code false} if query was cancelled before this call.
+     */
+    public synchronized boolean add(QueryCancellable query) {
+        if (cancelled)
+            return false;
+
+        boolean res = queries.add(query);
+
+        assert res;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void doCancel() {
+        cancelled = true;
+
+        try {
+            for (QueryCancellable query : queries)
+                query.doCancel();
+        }
+        catch (Exception e) {
+            failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+            throw e;
+        }
+    }
+
+    /** */
+    private void register(GridQueryCancel cancel) {
+        try {
+            cancel.set(this);
+        }
+        catch (QueryCancelledException e) {
+            throw new IgniteSQLException(QueryCancelledException.ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED);
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
index 393d26d..5405534 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -49,7 +49,7 @@ public class ScanNode extends AbstractNode<Object[]> implements SingleNode<Objec
             requestInternal();
         }
         catch (Exception e) {
-            Commons.close(it, e);
+            Commons.closeQuiet(it, e);
 
             throw e;
         }
@@ -98,7 +98,7 @@ public class ScanNode extends AbstractNode<Object[]> implements SingleNode<Objec
 
     /** {@inheritDoc} */
     @Override public void close() {
-        Commons.close(it);
+        Commons.closeQuiet(it);
 
         it = null;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 02bbe46..3293365 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.Collection;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
 
 /**
@@ -26,20 +26,12 @@ import org.apache.ignite.internal.processors.query.calcite.util.Service;
  */
 public interface MessageService extends Service {
     /**
-     * Sends a message to given nodes.
-     *
-     * @param nodeIds Nodes IDs.
-     * @param msg Message.
-     */
-    void send(Collection<UUID> nodeIds, CalciteMessage msg);
-
-    /**
      * Sends a message to given node.
      *
      * @param nodeId Node ID.
      * @param msg Message.
      */
-    void send(UUID nodeId, CalciteMessage msg);
+    void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
 
     /**
      * Registers a listener for messages of a given type.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index c6e3c2b..219fd92 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.Collection;
 import java.util.EnumMap;
 import java.util.Objects;
 import java.util.UUID;
@@ -27,7 +26,6 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -189,32 +187,16 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
     }
 
     /** {@inheritDoc} */
-    @Override public void send(Collection<UUID> nodeIds, CalciteMessage msg) {
-        for (UUID nodeId : nodeIds)
-            send(nodeId, msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void send(UUID nodeId, CalciteMessage msg) {
+    @Override public void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException {
         if (localNodeId().equals(nodeId))
             onMessage(nodeId, msg, true);
         else {
             byte plc = msg instanceof ExecutionContextAware ?
                 ((ExecutionContextAware) msg).ioPolicy() : GridIoPolicy.QUERY_POOL;
 
-            if (!prepareMarshal(msg))
-                return;
-
-            try {
-                ioManager().sendToGridTopic(nodeId, GridTopic.TOPIC_QUERY, msg, plc);
-            }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message, node failed: " + nodeId);
-            }
-            catch (IgniteCheckedException e) {
-                failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
-            }
+            prepareMarshal(msg);
+
+            ioManager().sendToGridTopic(nodeId, GridTopic.TOPIC_QUERY, msg, plc);
         }
     }
 
@@ -229,45 +211,41 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
     }
 
     /** */
-    protected void onMessage(UUID nodeId, CalciteMessage msg, boolean async) {
-        if (msg instanceof ExecutionContextAware) {
-            ExecutionContextAware msg0 = (ExecutionContextAware) msg;
-            taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg));
-        }
-        else if (async)
-            taskExecutor().execute(IgniteUuid.VM_ID, ThreadLocalRandom.current().nextLong(1024), () -> onMessageInternal(nodeId, msg));
-        else
-            onMessageInternal(nodeId, msg);
-    }
-
-    /** */
-    protected boolean prepareMarshal(Message msg) {
+    protected void prepareMarshal(Message msg) throws IgniteCheckedException {
         try {
             if (msg instanceof MarshalableMessage)
                 ((MarshalableMessage) msg).prepareMarshal(marshaller());
-
-            return true;
         }
         catch (IgniteCheckedException e) {
             failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
-        }
 
-        return false;
+            throw e;
+        }
     }
 
     /** */
-    protected boolean prepareUnmarshal(Message msg) {
+    protected void prepareUnmarshal(Message msg) throws IgniteCheckedException {
         try {
             if (msg instanceof MarshalableMessage)
                 ((MarshalableMessage) msg).prepareUnmarshal(marshaller(), classLoader());
-
-            return true;
         }
         catch (IgniteCheckedException e) {
             failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+            throw e;
         }
+    }
 
-        return false;
+    /** */
+    protected void onMessage(UUID nodeId, CalciteMessage msg, boolean async) {
+        if (msg instanceof ExecutionContextAware) {
+            ExecutionContextAware msg0 = (ExecutionContextAware) msg;
+            taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg));
+        }
+        else if (async)
+            taskExecutor().execute(IgniteUuid.VM_ID, ThreadLocalRandom.current().nextLong(1024), () -> onMessageInternal(nodeId, msg));
+        else
+            onMessageInternal(nodeId, msg);
     }
 
     /** */
@@ -278,10 +256,14 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
 
     /** */
     private void onMessageInternal(UUID nodeId, CalciteMessage msg) {
-        if (!prepareUnmarshal(msg))
-            return;
+        try {
+            prepareUnmarshal(msg);
 
-        MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.type()));
-        lsnr.onMessage(nodeId, msg);
+            MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.type()));
+            lsnr.onMessage(nodeId, msg);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index b3c5720..a24d4b1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
 
 /**
  * Collects meta information about a query fragment, mainly it is data
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index 1c4c940..48e9800 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
 import java.util.List;
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -41,6 +40,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.DistributedTable;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -106,11 +106,7 @@ public class IgniteMdDistribution implements MetadataHandler<BuiltInMetadata.Dis
      * See {@link IgniteMdDistribution#distribution(RelNode, RelMetadataQuery)}
      */
     public IgniteDistribution distribution(LogicalTableScan rel, RelMetadataQuery mq) {
-        RelDistribution distr = rel.getTable().getDistribution();
-
-        assert distr instanceof IgniteDistribution : distr;
-
-        return (IgniteDistribution) distr;
+        return rel.getTable().unwrap(DistributedTable.class).distribution();
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 7b82c33..3fdd28d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -31,11 +31,11 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.schema.DistributedTable;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
@@ -63,35 +63,35 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
      * @param mq Metadata query instance. Used to request appropriate metadata from node children.
      * @return Fragment meta information.
      */
-    public FragmentInfo getFragmentInfo(RelNode rel, RelMetadataQuery mq) {
+    public FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
         throw new AssertionError();
     }
 
     /**
-     * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+     * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      */
-    public FragmentInfo getFragmentInfo(RelSubset rel, RelMetadataQuery mq) {
+    public FragmentInfo fragmentInfo(RelSubset rel, RelMetadataQuery mq) {
         throw new AssertionError();
     }
 
     /**
-     * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+     * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      *
      * Prunes involved partitions (hence nodes, involved in query execution) if possible.
      */
-    public FragmentInfo getFragmentInfo(IgniteFilter rel, RelMetadataQuery mq) {
-        return fragmentInfo(rel.getInput(), mq).prune(rel);
+    public FragmentInfo fragmentInfo(IgniteFilter rel, RelMetadataQuery mq) {
+        return _fragmentInfo(rel.getInput(), mq).prune(rel);
     }
 
     /**
-     * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+     * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      */
-    public FragmentInfo getFragmentInfo(SingleRel rel, RelMetadataQuery mq) {
-        return fragmentInfo(rel.getInput(), mq);
+    public FragmentInfo fragmentInfo(SingleRel rel, RelMetadataQuery mq) {
+        return _fragmentInfo(rel.getInput(), mq);
     }
 
     /**
-     * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+     * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      *
      * {@link LocationMappingException} may be thrown on two children nodes locations merge. This means
      * that the fragment (which part the parent node is) cannot be executed on any node and additional exchange
@@ -99,11 +99,11 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
      * exchange. After the exchange is put into the fragment and the fragment is split into two ones, fragment meta
      * information will be recalculated for all fragments.
      */
-    public FragmentInfo getFragmentInfo(Join rel, RelMetadataQuery mq) {
+    public FragmentInfo fragmentInfo(Join rel, RelMetadataQuery mq) {
         mq = RelMetadataQueryEx.wrap(mq);
 
-        FragmentInfo left = fragmentInfo(rel.getLeft(), mq);
-        FragmentInfo right = fragmentInfo(rel.getRight(), mq);
+        FragmentInfo left = _fragmentInfo(rel.getLeft(), mq);
+        FragmentInfo right = _fragmentInfo(rel.getRight(), mq);
 
         try {
             return left.merge(right);
@@ -126,16 +126,16 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
     }
 
     /**
-     * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+     * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      */
-    public FragmentInfo getFragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
+    public FragmentInfo fragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
         return new FragmentInfo(Pair.of(rel, rel.source()));
     }
 
     /**
-     * See {@link IgniteMdFragmentInfo#getFragmentInfo(RelNode, RelMetadataQuery)}
+     * See {@link IgniteMdFragmentInfo#fragmentInfo(RelNode, RelMetadataQuery)}
      */
-    public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
+    public FragmentInfo fragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
         return new FragmentInfo(rel.getTable().unwrap(DistributedTable.class).mapping(Commons.context(rel)));
     }
 
@@ -145,7 +145,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
      * @param mq Metadata query instance.
      * @return Fragment meta information.
      */
-    public static FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
+    public static FragmentInfo _fragmentInfo(RelNode rel, RelMetadataQuery mq) {
         return RelMetadataQueryEx.wrap(mq).getFragmentInfo(rel);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 23628f6..8437f0b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -47,11 +47,11 @@ public class IgniteMetadata {
             FragmentMetadata.Handler.class, IgniteMethod.FRAGMENT_INFO.method());
 
         /** Determines how the rows are distributed. */
-        FragmentInfo getFragmentInfo();
+        FragmentInfo fragmentInfo();
 
         /** Handler API. */
         interface Handler extends MetadataHandler<FragmentMetadata> {
-            FragmentInfo getFragmentInfo(RelNode r, RelMetadataQuery mq);
+            FragmentInfo fragmentInfo(RelNode r, RelMetadataQuery mq);
         }
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
index fd4c33b..8857b45 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/OptimisticPlanningException.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import org.apache.ignite.internal.processors.query.calcite.splitter.Edge;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Edge;
 
 /**
  *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 28be9ef..7725d83 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -121,7 +121,7 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
     public FragmentInfo getFragmentInfo(RelNode rel) {
         for (;;) {
             try {
-                return sourceDistributionHandler.getFragmentInfo(rel, this);
+                return sourceDistributionHandler.fragmentInfo(rel, this);
             } catch (JaninoRelMetadataProvider.NoHandler e) {
                 sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentMetadata.DEF);
             }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/CalciteQueryFieldMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/CalciteQueryFieldMetadata.java
new file mode 100644
index 0000000..88345cd
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/CalciteQueryFieldMetadata.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+
+/**
+ *
+ */
+public class CalciteQueryFieldMetadata implements GridQueryFieldMetadata {
+    /** */
+    private String schemaName;
+
+    /** */
+    private String typeName;
+
+    /** */
+    private String fieldName;
+
+    /** */
+    private String fieldTypeName;
+
+    /** */
+    private int precision;
+
+    /** */
+    private int scale;
+
+    /** Blank constructor for external serialization. */
+    public CalciteQueryFieldMetadata() {
+    }
+
+    /**
+     * @param schemaName Schema name.
+     * @param typeName Type name.
+     * @param fieldName Field name.
+     * @param fieldTypeName Field type name.
+     * @param precision Precision.
+     * @param scale Scale.
+     */
+    public CalciteQueryFieldMetadata(String schemaName, String typeName, String fieldName, String fieldTypeName, int precision, int scale) {
+        this.schemaName = schemaName;
+        this.typeName = typeName;
+        this.fieldName = fieldName;
+        this.fieldTypeName = fieldTypeName;
+        this.precision = precision;
+        this.scale = scale;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String schemaName() {
+        return schemaName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String typeName() {
+        return typeName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String fieldName() {
+        return fieldName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String fieldTypeName() {
+        return fieldTypeName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int precision() {
+        return precision;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int scale() {
+        return scale;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(schemaName);
+        out.writeUTF(typeName);
+        out.writeUTF(fieldName);
+        out.writeUTF(fieldTypeName);
+        out.writeInt(precision);
+        out.writeInt(scale);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        schemaName = in.readUTF();
+        typeName = in.readUTF();
+        fieldName = in.readUTF();
+        fieldTypeName = in.readUTF();
+        precision = in.readInt();
+        scale = in.readInt();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Cloner.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index b3ac66e..2c9d0a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,7 +54,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
      * @param plan Plan to clone.
      * @return New plan.
      */
-    QueryPlan go(QueryPlan plan) {
+    MultiStepPlan go(MultiStepPlan plan) {
         List<Fragment> planFragments = plan.fragments();
 
         assert !F.isEmpty(planFragments);
@@ -69,7 +69,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
 
         Collections.reverse(fragments);
 
-        return new QueryPlan(fragments);
+        return new MultiStepPlanImpl(fragments, plan.fieldsMetadata());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
index 2863288..fe0f87b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Edge.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Edge.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import org.apache.calcite.rel.RelNode;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 3d9a39b..b9ef7c3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import com.google.common.collect.ImmutableList;
 import java.util.Collections;
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.LocationMapp
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -79,7 +78,7 @@ public class Fragment implements RelSource {
      * @param mq Metadata query used for data location calculation.
      */
     public void init(MappingService mappingService, PlanningContext ctx, RelMetadataQuery mq) {
-        FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
+        FragmentInfo info = IgniteMdFragmentInfo._fragmentInfo(root, mq);
 
         mapping = fragmentMapping(mappingService, ctx, info, mq);
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 40dcf0e..1a89a17 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -24,15 +24,13 @@ import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCostImpl;
+import org.apache.calcite.plan.RelOptLattice;
+import org.apache.calcite.plan.RelOptMaterialization;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.hep.HepPlanner;
-import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.plan.volcano.VolcanoUtils;
 import org.apache.calcite.prepare.CalciteCatalogReader;
@@ -47,28 +45,28 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexExecutor;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.sql2rel.SqlRexConvertletTable;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.Program;
-import org.apache.calcite.tools.Programs;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.GraphToRelConverter;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Query planer.
@@ -112,15 +110,9 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
     private final JavaTypeFactory typeFactory;
 
     /** */
-    private boolean open;
-
-    /** */
     private RelOptPlanner planner;
 
     /** */
-    private RelMetadataProvider metadataProvider;
-
-    /** */
     private SqlValidator validator;
 
     /**
@@ -143,11 +135,9 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         traitDefs = frameworkConfig.getTraitDefs();
     }
 
-
-
     /** {@inheritDoc} */
     @Override public RelTraitSet getEmptyTraitSet() {
-        return planner.emptyTraitSet();
+        return planner().emptyTraitSet();
     }
 
     /** {@inheritDoc} */
@@ -158,12 +148,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
     /** {@inheritDoc} */
     @Override public void reset() {
         planner = null;
-        metadataProvider = null;
         validator = null;
-
-        RelMetadataQuery.THREAD_PROVIDERS.remove();
-
-        open = false;
     }
 
     /**
@@ -173,35 +158,17 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         return ctx;
     }
 
-    /** */
-    private void ready() {
-        if (!open) {
-            planner = VolcanoUtils.impatient(new VolcanoPlanner(frameworkConfig.getCostFactory(), ctx));
-            planner.setExecutor(rexExecutor);
-            metadataProvider = new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner);
-
-            validator = new IgniteSqlValidator(operatorTable(), createCatalogReader(), typeFactory, conformance());
-            validator.setIdentifierExpansion(true);
-
-            for (RelTraitDef<?> def : traitDefs) {
-                planner.addRelTraitDef(def);
-            }
-
-            open = true;
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public SqlNode parse(Reader reader) throws SqlParseException {
-        return SqlParser.create(reader, parserConfig).parseStmt();
+        SqlNodeList sqlNodes = SqlParser.create(reader, parserConfig).parseStmtList();
+
+        return sqlNodes.size() == 1 ? sqlNodes.get(0) : sqlNodes;
     }
 
     /** {@inheritDoc} */
     @Override public SqlNode validate(SqlNode sqlNode) throws ValidationException {
-        ready();
-
         try {
-            return validator.validate(sqlNode);
+            return validator().validate(sqlNode);
         }
         catch (RuntimeException e) {
             throw new ValidationException(e);
@@ -210,13 +177,26 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
 
     /** {@inheritDoc} */
     @Override public Pair<SqlNode, RelDataType> validateAndGetType(SqlNode sqlNode) throws ValidationException {
-        ready();
-
-        SqlNode validatedNode = validate(sqlNode);
-        RelDataType type = validator.getValidatedNodeType(validatedNode);
+        SqlNode validatedNode = validator().validate(sqlNode);
+        RelDataType type = validator().getValidatedNodeType(validatedNode);
         return Pair.of(validatedNode, type);
     }
 
+    /**
+     * Validates a SQL statement.
+     *
+     * @param sqlNode Root node of the SQL parse tree.
+     * @return Validated node, its validated type and type's origins.
+     * @throws ValidationException if not valid
+     */
+    public ValidationResult validateAndGetTypeMetadata(SqlNode sqlNode) throws ValidationException {
+        SqlNode validatedNode = validator().validate(sqlNode);
+        RelDataType type = validator().getValidatedNodeType(validatedNode);
+        List<List<String>> origins = validator().getFieldOrigins(validatedNode);
+
+        return new ValidationResult(validatedNode, type, origins);
+    }
+
     /** {@inheritDoc} */
     @Override public RelNode convert(SqlNode sql) {
         return rel(sql).rel;
@@ -229,8 +209,6 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
      * @return Root node of relational tree.
      */
     public IgniteRel convert(RelGraph graph) {
-        ready();
-
         RelOptCluster cluster = createCluster();
         RelBuilder relBuilder = createRelBuilder(cluster, createCatalogReader());
 
@@ -239,15 +217,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
 
     /** Creates a cluster. */
     RelOptCluster createCluster() {
-        ready();
-
-        return createCluster(createRexBuilder());
+        RelOptCluster cluster = RelOptCluster.create(planner(), createRexBuilder());
+        cluster.setMetadataProvider(IgniteMetadata.METADATA_PROVIDER);
+        return cluster;
     }
 
     /** {@inheritDoc} */
     @Override public RelRoot rel(SqlNode sql) {
-        ready();
-
         RelOptCluster cluster = createCluster();
         SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
             .withConfig(sqlToRelConverterConfig)
@@ -256,11 +232,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
             .build();
         SqlToRelConverter sqlToRelConverter =
             new SqlToRelConverter(this, validator, createCatalogReader(), cluster, convertletTable, config);
-        RelRoot root = sqlToRelConverter.convertQuery(sql, false, true);
-        root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
-        RelBuilder relBuilder = createRelBuilder(cluster, null);
-        root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
-        return root;
+
+        return sqlToRelConverter.convertQuery(sql, false, true);
     }
 
     /**
@@ -270,8 +243,6 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
      * @return Relational nodes tree representation.
      */
     public RelGraph graph(RelNode rel) {
-        ready();
-
         if (rel.getConvention() != IgniteConvention.INSTANCE)
             throw new IllegalArgumentException("Physical node is required.");
 
@@ -280,15 +251,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
 
     /** {@inheritDoc} */
     @Override public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) {
-        ready();
-
         SqlParser parser = SqlParser.create(queryString, parserConfig);
         SqlNode sqlNode;
         try {
             sqlNode = parser.parseQuery();
         }
         catch (SqlParseException e) {
-            throw new RuntimeException("parse failed", e);
+            throw new IgniteSQLException("parse failed", IgniteQueryErrorCode.PARSING, e);
         }
 
         SqlConformance conformance = conformance();
@@ -305,86 +274,74 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
             .withConvertTableAccess(false)
             .build();
         SqlToRelConverter sqlToRelConverter =
-            new SqlToRelConverter(this, validator,
-                catalogReader, cluster, convertletTable, config);
+            new SqlToRelConverter(this, validator, catalogReader, cluster, convertletTable, config);
 
-        RelRoot root = sqlToRelConverter.convertQuery(sqlNode, true, false);
-        RelRoot root2 = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
-        RelBuilder relBuilder = createRelBuilder(cluster, null);
-        return root2.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
-    }
-
-    /** */
-    private RelOptCluster createCluster(RexBuilder rexBuilder) {
-        RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
-
-        cluster.setMetadataProvider(metadataProvider);
-        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(metadataProvider));
-
-        return cluster;
+        return sqlToRelConverter.convertQuery(sqlNode, true, false);
     }
 
     /** {@inheritDoc} */
     @Override public RelNode transform(int programIdx, RelTraitSet targetTraits, RelNode rel) {
-        ready();
-
-        RelTraitSet toTraits = targetTraits.simplify();
+        RelMetadataProvider clusterProvider = rel.getCluster().getMetadataProvider();
+        JaninoRelMetadataProvider threadProvider = RelMetadataQuery.THREAD_PROVIDERS.get();
+        try {
+            rel.getCluster().setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner()));
+            RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(rel.getCluster().getMetadataProvider()));
 
-        return programs.get(programIdx).run(planner, rel, toTraits, ImmutableList.of(), ImmutableList.of());
+            return programs.get(programIdx).run(planner(), rel, targetTraits.simplify(), materializations(), latices());
+        }
+        finally {
+            rel.getCluster().setMetadataProvider(clusterProvider);
+            RelMetadataQuery.THREAD_PROVIDERS.set(threadProvider);
+        }
     }
 
     /**
      * Converts one relational nodes tree into another relational nodes tree
      * based on a particular planner type, planning phase and required set of traits.
-     * @param plannerType Planner type.
-     * @param plannerPhase Planner phase.
-     * @param input Root node of relational tree.
+     * @param phase Planner phase.
      * @param targetTraits Target traits.
+     * @param rel Root node of relational tree.
      * @return The root of the new RelNode tree.
      */
-    public <T extends RelNode> T transform(PlannerType plannerType, PlannerPhase plannerPhase, RelNode input, RelTraitSet targetTraits)  {
-        ready();
-
-        RelTraitSet toTraits = targetTraits.simplify();
-
-        RelNode output;
-
-        switch (plannerType) {
-            case HEP:
-                final HepProgramBuilder programBuilder = new HepProgramBuilder();
-
-                for (RelOptRule rule : plannerPhase.getRules(Commons.context(ctx))) {
-                    programBuilder.addRuleInstance(rule);
-                }
-
-                final HepPlanner hepPlanner =
-                    new HepPlanner(programBuilder.build(), ctx, true, null, RelOptCostImpl.FACTORY);
-
-                hepPlanner.setRoot(input);
-
-                if (!input.getTraitSet().equals(targetTraits))
-                    hepPlanner.changeTraits(input, toTraits);
+    public <T extends RelNode> T transform(PlannerPhase phase, RelTraitSet targetTraits, RelNode rel)  {
+        RelMetadataProvider clusterProvider = rel.getCluster().getMetadataProvider();
+        JaninoRelMetadataProvider threadProvider = RelMetadataQuery.THREAD_PROVIDERS.get();
+        try {
+            rel.getCluster().setMetadataProvider(new CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER, planner()));
+            RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(rel.getCluster().getMetadataProvider()));
 
-                output = hepPlanner.findBestExp();
+            return (T) phase.getProgram(ctx).run(planner(), rel, targetTraits.simplify(), materializations(), latices());
+        }
+        finally {
+            rel.getCluster().setMetadataProvider(clusterProvider);
+            RelMetadataQuery.THREAD_PROVIDERS.set(threadProvider);
+        }
+    }
 
-                break;
-            case VOLCANO:
-                Program program = Programs.of(plannerPhase.getRules(Commons.context(ctx)));
+    /** {@inheritDoc} */
+    @Override public JavaTypeFactory getTypeFactory() {
+        return typeFactory;
+    }
 
-                output = program.run(planner, input, toTraits,
-                    ImmutableList.of(), ImmutableList.of());
+    /** */
+    private RelOptPlanner planner() {
+        if (planner == null) {
+            planner = VolcanoUtils.impatient(new VolcanoPlanner(frameworkConfig.getCostFactory(), ctx));
+            planner.setExecutor(rexExecutor);
 
-                break;
-            default:
-                throw new AssertionError("Unknown planner type: " + plannerType);
+            for (RelTraitDef<?> def : traitDefs)
+                planner.addRelTraitDef(def);
         }
 
-        return (T) output;
+        return planner;
     }
 
-    /** {@inheritDoc} */
-    @Override public JavaTypeFactory getTypeFactory() {
-        return typeFactory;
+    /** */
+    private SqlValidator validator() {
+        if (validator == null)
+            validator = new IgniteSqlValidator(operatorTable(), createCatalogReader(), typeFactory, conformance());
+
+        return validator;
     }
 
     /** */
@@ -409,10 +366,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
 
     /** */
     private CalciteCatalogReader createCatalogReader() {
-        SchemaPlus rootSchema = rootSchema(defaultSchema);
-
         return new CalciteCatalogReader(
-            CalciteSchema.from(rootSchema),
+            CalciteSchema.from(rootSchema(defaultSchema)),
             CalciteSchema.from(defaultSchema).path(null),
             typeFactory, connectionConfig);
     }
@@ -426,4 +381,14 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
             schema = schema.getParentSchema();
         }
     }
+
+    /** */
+    private List<RelOptLattice> latices() {
+        return ImmutableList.of(); // TODO
+    }
+
+    /** */
+    private List<RelOptMaterialization> materializations() {
+        return ImmutableList.of(); // TODO
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePrograms.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePrograms.java
new file mode 100644
index 0000000..6bbc86b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePrograms.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import org.apache.calcite.plan.RelOptLattice;
+import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.sql2rel.RelDecorrelator;
+import org.apache.calcite.tools.Program;
+import org.apache.calcite.tools.Programs;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ *
+ */
+public class IgnitePrograms {
+    /**
+     * Returns heuristic planer based program with given rules.
+     *
+     * @param rules Rules.
+     * @return New program.
+     */
+    public static Program hep(RuleSet rules) {
+        return (planner, rel, traits, materializations, lattices) -> {
+                final HepProgramBuilder builder = new HepProgramBuilder();
+
+                for (RelOptRule rule : rules)
+                    builder.addRuleInstance(rule);
+
+                final HepPlanner hepPlanner = new HepPlanner(builder.build(), Commons.context(rel), true,
+                    null, Commons.context(rel).frameworkConfig().getCostFactory());
+
+                for (RelOptMaterialization materialization : materializations)
+                    hepPlanner.addMaterialization(materialization);
+
+                for (RelOptLattice lattice : lattices)
+                    hepPlanner.addLattice(lattice);
+
+                hepPlanner.setRoot(rel);
+
+                return hepPlanner.findBestExp();
+        };
+    }
+
+    /**
+     * Returns cost based planer based program with given rules.
+     *
+     * @param rules Rules.
+     * @return New program.
+     */
+    public static Program cbo(RuleSet rules) {
+        return Programs.of(rules);
+    }
+
+    /**
+     * @return Query decorrelate program.
+     */
+    public static Program decorrelate() {
+        return (planner, rel, traits, materializations, lattices) -> {
+            if (Commons.context(rel).connectionConfig().forceDecorrelate())
+                return RelDecorrelator.decorrelateQuery(rel, RelFactories.LOGICAL_BUILDER.create(rel.getCluster(), null));
+
+            return rel;
+        };
+    }
+
+    /**
+     * @return A program that executes a sequence of programs.
+     */
+    public static Program sequence(Program... programs) {
+        return Programs.sequence(programs);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
new file mode 100644
index 0000000..753c511
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+
+/**
+ * Regular query or DML
+ */
+public interface MultiStepPlan extends QueryPlan {
+    /**
+     * @return Query fragments.
+     */
+    List<Fragment> fragments();
+
+    /**
+     * @return Row metadata.
+     */
+    List<GridQueryFieldMetadata> fieldsMetadata();
+
+    /**
+     * Inits query fragments.
+     *
+     * @param mappingService Mapping service.
+     * @param ctx Planner context.
+     */
+    void init(MappingService mappingService, PlanningContext ctx);
+
+    /** {@inheritDoc} */
+    @Override default Type type() {
+        return Type.QUERY;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlanImpl.java
similarity index 75%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlanImpl.java
index be17511..282878e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlanImpl.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.util.typedef.F;
@@ -33,31 +34,41 @@ import org.apache.ignite.internal.util.typedef.F;
 /**
  * Distributed query plan.
  */
-public class QueryPlan {
+public class MultiStepPlanImpl implements MultiStepPlan {
     /** */
     private final List<Fragment> fragments;
 
+    /** */
+    private final List<GridQueryFieldMetadata> fieldsMeta;
+
     /**
      * @param fragments Query fragments.
      */
-    public QueryPlan(List<Fragment> fragments) {
-        this.fragments = fragments;
+    public MultiStepPlanImpl(List<Fragment> fragments) {
+        this(fragments, ImmutableList.of());
     }
 
     /**
-     * @return Query fragments.
+     * @param fragments Query fragments.
+     * @param fieldsMeta Fields metadata.
      */
-    public List<Fragment> fragments() {
+    public MultiStepPlanImpl(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) {
+        this.fieldsMeta = fieldsMeta;
+        this.fragments = fragments;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Fragment> fragments() {
         return fragments;
     }
 
-    /**
-     * Inits query fragments.
-     *
-     * @param mappingService Mapping service.
-     * @param ctx Planner context.
-     */
-    public void init(MappingService mappingService, PlanningContext ctx) {
+    /** {@inheritDoc} */
+    @Override public List<GridQueryFieldMetadata> fieldsMetadata() {
+        return fieldsMeta;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void init(MappingService mappingService, PlanningContext ctx) {
         int i = 0;
 
         RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
@@ -103,10 +114,8 @@ public class QueryPlan {
         }
     }
 
-    /**
-     * Clones this plan with a new cluster.
-     */
-    public QueryPlan clone(RelOptCluster cluster) {
+    /** {@inheritDoc} */
+    @Override public MultiStepPlan clone(RelOptCluster cluster) {
         return new Cloner(cluster).go(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 9417af2..975b2d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -17,26 +17,40 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import org.apache.calcite.plan.volcano.AbstractConverter;
 import org.apache.calcite.rel.rules.SubQueryRemoveRule;
+import org.apache.calcite.rel.rules.TableScanRule;
+import org.apache.calcite.tools.Program;
 import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
 import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverter;
 import org.apache.ignite.internal.processors.query.calcite.rule.JoinConverter;
 import org.apache.ignite.internal.processors.query.calcite.rule.ProjectConverter;
 
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.cbo;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.decorrelate;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.hep;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.sequence;
+
 /**
  * Represents a planner phase with its description and a used rule set.
  */
 public enum PlannerPhase {
     /** */
-    SUBQUERY_REWRITE("Sub-queries rewrites") {
+    HEURISTIC_OPTIMIZATION("Heuristic optimization phase") {
         /** {@inheritDoc} */
         @Override public RuleSet getRules(PlanningContext ctx) {
             return RuleSets.ofList(
+                TableScanRule.INSTANCE,
                 SubQueryRemoveRule.FILTER,
                 SubQueryRemoveRule.PROJECT,
                 SubQueryRemoveRule.JOIN);
         }
+
+        /** {@inheritDoc} */
+        @Override public Program getProgram(PlanningContext ctx) {
+            return sequence(hep(getRules(ctx)), decorrelate());
+        }
     },
 
     /** */
@@ -44,10 +58,16 @@ public enum PlannerPhase {
         /** {@inheritDoc} */
         @Override public RuleSet getRules(PlanningContext ctx) {
             return RuleSets.ofList(
+                AbstractConverter.ExpandConversionRule.INSTANCE,
                 JoinConverter.INSTANCE,
                 ProjectConverter.INSTANCE,
                 FilterConverter.INSTANCE);
         }
+
+        /** {@inheritDoc} */
+        @Override public Program getProgram(PlanningContext ctx) {
+            return cbo(getRules(ctx));
+        }
     };
 
     /** */
@@ -66,4 +86,11 @@ public enum PlannerPhase {
      * @return Rule set.
      */
     public abstract RuleSet getRules(PlanningContext ctx);
+
+    /**
+     * Returns a program, calculated on the basis of query, planner context planner phase and rules set.
+     * @param ctx Planner context.
+     * @return Rule set.
+     */
+    public abstract Program getProgram(PlanningContext ctx);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
deleted file mode 100644
index cddc4d5..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-
-/**
- *
- */
-public enum PlannerType {
-    HEP,
-    VOLCANO;
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index fabe1d6..fdfd779 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -31,6 +31,7 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.QueryCancelGroup;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 
 /**
@@ -59,6 +60,9 @@ public final class PlanningContext implements Context {
     private final AffinityTopologyVersion topologyVersion;
 
     /** */
+    private final QueryCancelGroup cancelGroup;
+
+    /** */
     private final IgniteLogger logger;
 
     /** */
@@ -74,10 +78,11 @@ public final class PlanningContext implements Context {
      * Private constructor, used by a builder.
      */
     private PlanningContext(FrameworkConfig config, Context parentContext, UUID localNodeId, UUID originatingNodeId,
-        String query, Object[] parameters, AffinityTopologyVersion topologyVersion, IgniteLogger logger) {
+        String query, Object[] parameters, AffinityTopologyVersion topologyVersion, QueryCancelGroup cancelGroup, IgniteLogger logger) {
         this.parentContext = parentContext;
         this.localNodeId = localNodeId;
         this.parameters = parameters;
+        this.cancelGroup = cancelGroup;
         this.originatingNodeId = originatingNodeId == null ? localNodeId : originatingNodeId;
         this.query = query;
         this.topologyVersion = topologyVersion;
@@ -133,6 +138,13 @@ public final class PlanningContext implements Context {
     }
 
     /**
+     * @return Query cancel group.
+     */
+    public QueryCancelGroup cancelGroup() {
+        return cancelGroup;
+    }
+
+    /**
      * @return Logger.
      */
     public IgniteLogger logger() {
@@ -152,6 +164,13 @@ public final class PlanningContext implements Context {
     }
 
     /**
+     * @return Schema name.
+     */
+    public String schemaName() {
+        return schema().getName();
+    }
+
+    /**
      * @return Schema.
      */
     public SchemaPlus schema() {
@@ -215,21 +234,6 @@ public final class PlanningContext implements Context {
     }
 
     /**
-     * @return Context builder.
-     */
-    public static Builder builder(PlanningContext template) {
-        return new Builder()
-            .logger(template.logger)
-            .topologyVersion(template.topologyVersion)
-            .query(template.query)
-            .parameters(template.parameters)
-            .parentContext(template.parentContext)
-            .frameworkConfig(template.frameworkConfig)
-            .originatingNodeId(template.originatingNodeId)
-            .localNodeId(template.localNodeId);
-    }
-
-    /**
      * Planner context builder.
      */
     public static class Builder {
@@ -255,6 +259,9 @@ public final class PlanningContext implements Context {
         private AffinityTopologyVersion topologyVersion;
 
         /** */
+        private QueryCancelGroup cancelGroup;
+
+        /** */
         private IgniteLogger logger;
 
         /**
@@ -321,6 +328,15 @@ public final class PlanningContext implements Context {
         }
 
         /**
+         * @param cancelGroup Query cancel group.
+         * @return Builder for chaining.
+         */
+        public Builder cancelGroup(QueryCancelGroup cancelGroup) {
+            this.cancelGroup = cancelGroup;
+            return this;
+        }
+
+        /**
          * @param logger Logger.
          * @return Builder for chaining.
          */
@@ -336,7 +352,7 @@ public final class PlanningContext implements Context {
          */
         public PlanningContext build() {
             return new PlanningContext(frameworkConfig, parentContext, localNodeId, originatingNodeId, query,
-                parameters, topologyVersion, logger);
+                parameters, topologyVersion, cancelGroup, logger);
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
new file mode 100644
index 0000000..cab5a88
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import org.apache.calcite.plan.RelOptCluster;
+
+/**
+ *
+ */
+public interface QueryPlan {
+    /** Query type */
+    enum Type { QUERY, DML, DDL }
+
+    /**
+     * @return Query type.
+     */
+    Type type();
+
+    /**
+     * Clones this plan with a new cluster.
+     */
+    QueryPlan clone(RelOptCluster cluster);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
index 0f2586e..fe5f628 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import java.util.List;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
 
 /**
@@ -30,5 +30,5 @@ public interface QueryPlanCache extends Service {
      * @param factory Factory method to generate a plan on cache miss.
      * @return Query plan.
      */
-    QueryPlan queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory);
+    List<QueryPlan> queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 9605be4..df371e2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
@@ -39,7 +39,7 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
     private GridInternalSubscriptionProcessor subscriptionProcessor;
 
     /** */
-    private volatile Map<CacheKey, QueryPlan> cache;
+    private volatile Map<CacheKey, List<QueryPlan>> cache;
 
     /**
      * @param ctx Kernal context.
@@ -70,20 +70,23 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public QueryPlan queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory) {
-        Map<CacheKey, QueryPlan> cache = this.cache;
+    /** {@inheritDoc}
+     * @return*/
+    @Override public List<QueryPlan> queryPlan(PlanningContext ctx, CacheKey key, QueryPlanFactory factory) {
+        Map<CacheKey, List<QueryPlan>> cache = this.cache;
 
-        QueryPlan template = cache.get(key);
+        List<QueryPlan> template = cache.get(key);
 
         if (template != null)
-            return template.clone(ctx.createCluster());
-
-        QueryPlan plan = factory.create(ctx);
+            return Commons.transform(template, t-> t.clone(ctx.createCluster()));
+        else {
+            List<QueryPlan> prepared = factory.create(ctx);
 
-        cache.putIfAbsent(key, plan.clone(Commons.EMPTY_CLUSTER));
+            if (prepared.size() == 1) // do not cache multiline queries.
+                cache.putIfAbsent(key, Commons.transform(prepared, p -> p.clone(Commons.EMPTY_CLUSTER)));
 
-        return plan;
+            return prepared;
+        }
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java
index da9ad4d..f3391d3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanFactory.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import java.util.List;
 
 /**
  *
@@ -27,5 +27,5 @@ public interface QueryPlanFactory {
      * @param ctx Planning context.
      * @return Query plan.
      */
-    QueryPlan create(PlanningContext ctx);
+    List<QueryPlan> create(PlanningContext ctx);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
similarity index 92%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
index 30574c4..1f1cbf0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSource.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSource.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 
 /**
  *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
index 9c8a735..8afbb01 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelSourceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelSourceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.io.Serializable;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
index 385bae3..1dcb6b4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTarget.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTarget.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
index a8fe214..570b81f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/RelTargetImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RelTargetImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.io.Serializable;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RowMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RowMetadata.java
new file mode 100644
index 0000000..824d815
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/RowMetadata.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class RowMetadata {
+   /** */
+   private final List<GridQueryFieldMetadata> fieldsMeta;
+
+   /** */
+   public RowMetadata(@NotNull List<GridQueryFieldMetadata> fieldsMeta) {
+      this.fieldsMeta = fieldsMeta;
+   }
+
+   /**
+    * @return Query metadata.
+    */
+   public List<GridQueryFieldMetadata> fieldsMetadata() {
+      return fieldsMeta;
+   }
+   /**
+    * Gets number of columns in a row.
+    *
+    * @return row size.
+    */
+   public int fieldsCount() {
+      return fieldsMeta.size();
+   }
+
+   /**
+    * Gets field name.
+    *
+    * @param idx field index.
+    * @return Field name.
+    */
+   public String fieldName(int idx){
+      return fieldsMeta.get(idx).fieldName();
+   }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
similarity index 91%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index adda757..74d093a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,19 +37,24 @@ import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
 /**
  * Splits a query into a list of query fragments.
  */
-public class Splitter implements IgniteRelVisitor<IgniteRel>, RelOp<IgniteRel, QueryPlan> {
+public class Splitter implements IgniteRelVisitor<IgniteRel>, RelOp<IgniteRel, List<Fragment>> {
     /** */
     private List<Fragment> fragments;
 
     /** {@inheritDoc} */
-    @Override public QueryPlan go(IgniteRel root) {
+    @Override public List<Fragment> go(IgniteRel root) {
         fragments = new ArrayList<>();
 
-        fragments.add(new Fragment(visit(root)));
+        try {
+            fragments.add(new Fragment(visit(root)));
 
-        Collections.reverse(fragments);
+            Collections.reverse(fragments);
 
-        return new QueryPlan(fragments);
+            return fragments;
+        }
+        finally {
+            fragments = null;
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ValidationResult.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ValidationResult.java
new file mode 100644
index 0000000..bb7d8e7
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ValidationResult.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlNode;
+
+/**
+ *
+ */
+public class ValidationResult {
+    /** */
+    private final SqlNode sqlNode;
+
+    /** */
+    private final RelDataType dataType;
+
+    /** */
+    private final List<List<String>> origins;
+
+    /**
+     *
+     * @param sqlNode Validated SQL node.
+     * @param dataType Validated type.
+     * @param origins Type fields provenance.
+     */
+    public ValidationResult(SqlNode sqlNode, RelDataType dataType, List<List<String>> origins) {
+        this.sqlNode = sqlNode;
+        this.dataType = dataType;
+        this.origins = origins;
+    }
+
+    /**
+     * @return Validated SQL node.
+     */
+    public SqlNode sqlNode() {
+        return sqlNode;
+    }
+
+    /**
+     * @return Validated type.
+     */
+    public RelDataType dataType() {
+        return dataType;
+    }
+
+    /**
+     * @return Type fields provenance.
+     */
+    public List<List<String>> origins() {
+        return origins;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
index 6f66ab8..34c4e5e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteConvention.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.AbstractConverter;
 
 /**
  * Ignite convention trait.
@@ -40,12 +38,6 @@ public class IgniteConvention extends Convention.Impl {
     }
 
     /** {@inheritDoc} */
-    @Override public void register(RelOptPlanner planner) {
-        // This convention relies on AbstractConverter logic, so, ExpandConversionRule need to be registered.
-        planner.addRule(AbstractConverter.ExpandConversionRule.INSTANCE);
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
         // use converters for physical nodes only.
         return toTraits.contains(INSTANCE) && fromTraits.contains(INSTANCE);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 5289c90..78083a1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index 4a4e4b5..e89bece 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
 
 /**
  * Relational expression that iterates over its input
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 83f2453..3247458 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -125,6 +125,7 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
     /** */
     private void rebuild() {
         SchemaPlus schema = Frameworks.createRootSchema(false);
+        schema.add("PUBLIC", new IgniteSchema("PUBLIC"));
         schemas.forEach(schema::add);
         schema(schema);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 31c607f..ed43db0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -18,14 +18,16 @@
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -47,6 +49,9 @@ public class TableDescriptorImpl implements TableDescriptor {
     private final Object affinityIdentity;
 
     /** */
+    private final ColumnDescriptor[] descriptors;
+
+    /** */
     private final int affinityFieldIdx;
 
     /** */
@@ -55,7 +60,31 @@ public class TableDescriptorImpl implements TableDescriptor {
         this.typeDesc = typeDesc;
         this.affinityIdentity = affinityIdentity;
 
-        affinityFieldIdx = lookupAffinityIndex(typeDesc);
+        String affinityField = typeDesc.affinityKey(); int affinityFieldIdx = 0;
+
+        List<ColumnDescriptor> descriptors = new ArrayList<>();
+
+        descriptors.add(new KeyValDescriptor(QueryUtils.KEY_FIELD_NAME, typeDesc.keyClass(), true, true));
+        descriptors.add(new KeyValDescriptor(QueryUtils.VAL_FIELD_NAME, typeDesc.keyClass(), true, false));
+
+        for (String field : this.typeDesc.fields().keySet()) {
+            if (Objects.equals(affinityField, field)) {
+                assert affinityFieldIdx == 0;
+
+                affinityFieldIdx = descriptors.size();
+            }
+
+            if (Objects.equals(field, typeDesc.keyFieldAlias()))
+                descriptors.add(new KeyValDescriptor(typeDesc.keyFieldAlias(), typeDesc.keyClass(), false, true));
+            else if (Objects.equals(field, typeDesc.valueFieldAlias()))
+                descriptors.add(new KeyValDescriptor(typeDesc.valueFieldAlias(), typeDesc.valueClass(), false, false));
+            else
+                descriptors.add(new FieldDescriptor(typeDesc.property(field)));
+        }
+
+        this.descriptors = descriptors.toArray(new ColumnDescriptor[0]);
+
+        this.affinityFieldIdx = affinityFieldIdx;
     }
 
     /** {@inheritDoc} */
@@ -64,11 +93,8 @@ public class TableDescriptorImpl implements TableDescriptor {
 
         RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(f);
 
-        b.add(QueryUtils.KEY_FIELD_NAME, f.createSystemType(typeDesc.keyClass()));
-        b.add(QueryUtils.VAL_FIELD_NAME, f.createSystemType(typeDesc.valueClass()));
-
-        for (Map.Entry<String, Class<?>> field : typeDesc.fields().entrySet())
-            b.add(field.getKey(), f.createJavaType(field.getValue()));
+        for (ColumnDescriptor desc : descriptors)
+            b.add(desc.name(), desc.type(f));
 
         return b.build();
     }
@@ -98,34 +124,87 @@ public class TableDescriptorImpl implements TableDescriptor {
 
     /** {@inheritDoc} */
     @Override public <T> T toRow(ExecutionContext ectx, CacheDataRow row) throws IgniteCheckedException {
-        Object[] res = new Object[typeDesc.fields().size() + 2];
+        Object[] res = new Object[descriptors.length];
 
-        int i = 0;
+        for (int i = 0; i < descriptors.length; i++)
+            res[i] = descriptors[i].value(ectx, cctx, row);
 
-        res[i++] = cctx.unwrapBinaryIfNeeded(row.key(), ectx.keepBinary());
-        res[i++] = cctx.unwrapBinaryIfNeeded(row.value(), ectx.keepBinary());
+        return (T) res;
+    }
 
-        for (String field : typeDesc.fields().keySet())
-            res[i++] = cctx.unwrapBinaryIfNeeded(typeDesc.value(field, row.key(), row.value()), ectx.keepBinary());
+    /** */
+    private interface ColumnDescriptor {
+        /** */
+        String name();
 
-        return (T) res;
+        /** */
+        RelDataType type(IgniteTypeFactory f);
+
+        /** */
+        Object value(ExecutionContext ectx, GridCacheContext<?,?> cctx, CacheDataRow src) throws IgniteCheckedException;
     }
 
     /** */
-    private static int lookupAffinityIndex(GridQueryTypeDescriptor queryTypeDesc) {
-        if (queryTypeDesc.affinityKey() != null) {
-            int idx = 2;
+    private static class KeyValDescriptor implements ColumnDescriptor {
+        /** */
+        private final String name;
 
-            String affField = queryTypeDesc.affinityKey();
+        /** */
+        private final Class<?> type;
 
-            for (String s : queryTypeDesc.fields().keySet()) {
-                if (affField.equals(s))
-                    return idx;
+        /** */
+        private final boolean isSystem;
 
-                idx++;
-            }
+        /** */
+        private final boolean isKey;
+
+        /** */
+        private KeyValDescriptor(String name, Class<?> type, boolean isSystem, boolean isKey) {
+            this.name = name;
+            this.type = type;
+            this.isSystem = isSystem;
+            this.isKey = isKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType type(IgniteTypeFactory f) {
+            return isSystem ? f.createSystemType(type) : f.createJavaType(type);
         }
 
-        return 0;
+        /** {@inheritDoc} */
+        @Override public Object value(ExecutionContext ectx, GridCacheContext<?,?> cctx, CacheDataRow src) throws IgniteCheckedException {
+            return cctx.unwrapBinaryIfNeeded(isKey ? src.key() : src.value(), ectx.keepBinary());
+        }
+    }
+
+    /** */
+    private static class FieldDescriptor implements ColumnDescriptor {
+        /** */
+        private final GridQueryProperty desc;
+
+        /** */
+        private FieldDescriptor(GridQueryProperty desc) {
+            this.desc = desc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return desc.name();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType type(IgniteTypeFactory f) {
+            return f.createJavaType(desc.type());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object value(ExecutionContext ectx, GridCacheContext<?,?> cctx, CacheDataRow src) throws IgniteCheckedException {
+            return cctx.unwrapBinaryIfNeeded(desc.value(src.key(), src.value()), ectx.keepBinary());
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index 586ef09..24ff49c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -19,12 +19,12 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.util.List;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSource;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelSourceImpl;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
 import org.apache.ignite.internal.processors.query.calcite.serialize.type.Types;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSource;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelSourceImpl;
 
 
 /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index b81b555..e8c161f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -21,9 +21,9 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.splitter.RelTarget;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index a586a46..ba363a6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -30,6 +30,7 @@ import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.QueryContext;
@@ -165,7 +166,23 @@ public final class Commons {
     /**
      * @param o Object to close.
      */
-    public static void close(Object o) {
+    public static void close(Object o) throws Exception {
+        if (o instanceof AutoCloseable)
+            ((AutoCloseable) o).close();
+    }
+
+    /**
+     * @param o Object to close.
+     */
+    public static void close(Object o, IgniteLogger log) {
+        if (o instanceof AutoCloseable)
+            U.close((AutoCloseable) o, log);
+    }
+
+    /**
+     * @param o Object to close.
+     */
+    public static void closeQuiet(Object o) {
         if (o instanceof AutoCloseable)
             U.closeQuiet((AutoCloseable) o);
     }
@@ -174,7 +191,7 @@ public final class Commons {
      * @param o Object to close.
      * @param e Exception, what causes close.
      */
-    public static void close(Object o, @Nullable Exception e) {
+    public static void closeQuiet(Object o, @Nullable Exception e) {
         if (!(o instanceof AutoCloseable))
             return;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
new file mode 100644
index 0000000..10331ea
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+/**
+ *
+ */
+class ConvertingClosableIterator<T, R> implements Iterator<R>, AutoCloseable {
+    /** */
+    private final Iterator<T> it;
+
+    /** */
+    private final Function<T, R> converter;
+
+    /** */
+    public ConvertingClosableIterator(Iterator<T> it, Function<T, R> converter) {
+        this.it = it;
+        this.converter = converter;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public boolean hasNext() {
+        return it.hasNext();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public R next() {
+        return converter.apply(it.next());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void close() throws Exception {
+        if (it instanceof AutoCloseable)
+            ((AutoCloseable) it).close();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index c2a11f3..c71f6a0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada
  */
 public enum IgniteMethod {
     DERIVED_DISTRIBUTIONS(DerivedDistribution.class, "deriveDistributions"),
-    FRAGMENT_INFO(FragmentMetadata.class, "getFragmentInfo");
+    FRAGMENT_INFO(FragmentMetadata.class, "fragmentInfo");
 
     /** */
     private final Method method;
@@ -37,7 +37,7 @@ public enum IgniteMethod {
      * @param methodName Method name.
      * @param argumentTypes Method parameters types.
      */
-    IgniteMethod(Class clazz, String methodName, Class... argumentTypes) {
+    IgniteMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
         method = Types.lookupMethod(clazz, methodName, argumentTypes);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
index 1b952f9..ba921c4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
@@ -17,57 +17,61 @@
 
 package org.apache.ignite.internal.processors.query.calcite.util;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Spliterators;
 import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
-public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>> {
+public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>>, QueryCursorEx<List<?>> {
     /** */
-    private final RelDataType rowType;
+    private final Iterator<List<?>> it;
 
     /** */
-    private final Iterator<T> it;
-
-    /** */
-    private final Function<T, List<?>> converter;
+    private final List<GridQueryFieldMetadata> fieldsMeta;
 
     /**
-     * @param rowType Row data type description.
      * @param it Iterator.
      * @param converter Row converter.
+     * @param fieldsMeta Fields metadata.
      */
-    public ListFieldsQueryCursor(RelDataType rowType, Iterator<T> it, Function<T, List<?>> converter) {
-        this.rowType = rowType;
-        this.it = it;
-        this.converter = converter;
+    public ListFieldsQueryCursor(Iterator<T> it, Function<T, List<?>> converter, List<GridQueryFieldMetadata> fieldsMeta) {
+        this.it = new ConvertingClosableIterator<>(it, converter);
+        this.fieldsMeta = fieldsMeta;
     }
 
     /** {@inheritDoc} */
-    @Override public String getFieldName(int idx) {
-        return rowType.getFieldList().get(idx).getName();
+    @NotNull @Override public Iterator<List<?>> iterator() {
+        return it;
     }
 
     /** {@inheritDoc} */
-    @Override public int getColumnsCount() {
-        return rowType.getFieldCount();
+    @Override public List<List<?>> getAll() {
+        ArrayList<List<?>> res = new ArrayList<>();
+
+        try {
+            getAll(res::add);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+
+        return res;
     }
 
     /** {@inheritDoc} */
-    @Override public List<List<?>> getAll() {
+    @Override public void getAll(Consumer<List<?>> c) throws IgniteCheckedException {
         try {
-            return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false)
-                .map(converter)
-                .collect(Collectors.toList());
+            while (it.hasNext())
+                c.consume(it.next());
         }
         finally {
             close();
@@ -75,12 +79,27 @@ public class ListFieldsQueryCursor<T> implements FieldsQueryCursor<List<?>> {
     }
 
     /** {@inheritDoc} */
-    @Override public void close() {
-        Commons.close(it);
+    @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+        return fieldsMeta;
     }
 
     /** {@inheritDoc} */
-    @NotNull @Override public Iterator<List<?>> iterator() {
-        return F.iterator(it, converter::apply, true);
+    @Override public String getFieldName(int idx) {
+        return fieldsMeta.get(idx).fieldName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getColumnsCount() {
+        return fieldsMeta.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isQuery() {
+        return true; // TODO pass and check QueryPlan.Type
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        Commons.closeQuiet(it);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
index 479a8ef..118b5c0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScan.java
@@ -182,7 +182,7 @@ public class TableScan implements Iterable<Object[]> {
                 return this;
             }
             catch (Exception e) {
-                Commons.close(this, e);
+                Commons.closeQuiet(this, e);
 
                 throw e;
             }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index eb09e95..4f8cf09 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -81,13 +81,17 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
         QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
 
-        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?", 1);
+        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+            "" +
+                "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?;" +
+                "select * from DEVELOPER d, PROJECT p where d.projectId = p._key and d._key = ?"
+            , 0,1);
 
-        List<List<?>> all = F.first(query).getAll();
 
-        assertTrue(!F.isEmpty(all));
-        assertEquals(1, all.size());
-        assertEqualsCollections(Arrays.asList("Roman", 0, "Ignite"), F.first(all));
+        assertEquals(2, query.size());
+
+        assertEqualsCollections(Arrays.asList("Igor", 1, "Calcite"), F.first(query.get(0).getAll()));
+        assertEqualsCollections(Arrays.asList("Roman", 0, "Ignite"), F.first(query.get(1).getAll()));
     }
 
     /** */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index bca71ad..0857e7f5 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -60,10 +60,13 @@ import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessag
 import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlanImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -72,9 +75,6 @@ import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.SortedTable;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
@@ -480,7 +480,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -571,13 +571,15 @@ public class PlannerTest extends GridCommonAbstractTest {
 
             RelNode rel = relRoot.rel;
 
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
             // Transformation chain
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
@@ -687,18 +689,18 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = planner.convert(sqlNode);
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             assertNotNull(rel);
 
-            QueryPlan plan = new Splitter().go((IgniteRel) rel);
+            MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) rel));
 
             assertNotNull(plan);
 
@@ -832,21 +834,21 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
 
         assertNotNull(plan);
 
@@ -963,17 +965,17 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster()
                 .traitSetOf(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single());
 
-            RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             assertNotNull(phys);
 
-            QueryPlan plan = new Splitter().go((IgniteRel) phys);
+            MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) phys));
 
             assertNotNull(plan);
 
@@ -1198,21 +1200,21 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
 
         assertNotNull(plan);
 
@@ -1320,21 +1322,21 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
 
         assertNotNull(plan);
 
@@ -1441,21 +1443,21 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
 
         assertNotNull(plan);
 
@@ -1562,21 +1564,21 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
 
         assertNotNull(plan);
 
@@ -1680,21 +1682,21 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode rel = relRoot.rel;
 
             // Transformation chain
-            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteConvention.INSTANCE)
                 .replace(IgniteDistributions.single())
                 .simplify();
 
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.OPTIMIZATION, rel, desired);
+            rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
         }
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        MultiStepPlan plan = new MultiStepPlanImpl(new Splitter().go((IgniteRel) relRoot.rel));
 
         assertNotNull(plan);
 
@@ -1785,13 +1787,13 @@ public class PlannerTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected boolean prepareMarshal(Message msg) {
-            return true;
+        @Override protected void prepareMarshal(Message msg) {
+            // No-op;
         }
 
         /** {@inheritDoc} */
-        @Override protected boolean prepareUnmarshal(Message msg) {
-            return true;
+        @Override protected void prepareUnmarshal(Message msg) {
+            // No-op;
         }
     }
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java
index bb430954..96cf859 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractExecutionTest.java
@@ -163,13 +163,13 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected boolean prepareMarshal(Message msg) {
-            return true;
+        @Override protected void prepareMarshal(Message msg) {
+            // No-op;
         }
 
         /** {@inheritDoc} */
-        @Override protected boolean prepareUnmarshal(Message msg) {
-            return true;
+        @Override protected void prepareUnmarshal(Message msg) {
+            // No-op;
         }
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolderTest.java
new file mode 100644
index 0000000..7e99f4b
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ClosableIteratorsHolderTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GENERATED = 10000;
+
+    /** */
+    private Set<Iterator<?>> iterators;
+
+    /** */
+    private ClosableIteratorsHolder holder;
+
+    @Before
+    public void setup() throws Exception {
+        iterators = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        holder = new ClosableIteratorsHolder(log());
+        holder.init();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        holder.tearDown();
+
+        holder = null;
+        iterators = null;
+    }
+
+    @Test
+    public void iterator() throws Exception {
+        for (int i = 0; i < GENERATED; i++)
+            holder.iterator(newIterator());
+
+        System.gc();
+
+        assertTrue(iterators.size() < GENERATED);
+    }
+
+    /** */
+    private Iterator<?> newIterator() {
+        final ClosableIterator iterator = new ClosableIterator();
+        iterators.add(iterator);
+        return iterator;
+    }
+
+    /** */
+    private class ClosableIterator implements Iterator<Object>, AutoCloseable {
+        /** */
+        public final byte[] data;
+
+        private ClosableIterator() {
+            data = new byte[4096];
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object next() {
+            throw new NoSuchElementException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            Optional.ofNullable(iterators)
+                .ifPresent(set -> set.remove(this));
+        }
+    }
+}
\ No newline at end of file
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java
index 96cd789..aa5dbe6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/OutboxTest.java
@@ -129,19 +129,19 @@ public class OutboxTest extends GridCommonAbstractTest {
         private List<?> lastBatch;
 
         /** {@inheritDoc} */
-        @Override public void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
+        @Override public void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
             ids.add(batchId);
 
             lastBatch = rows;
         }
 
         /** {@inheritDoc} */
-        @Override public void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+        @Override public void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
             throw new AssertionError();
         }
 
         /** {@inheritDoc} */
-        @Override public void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+        @Override public void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
             throw new AssertionError();
         }
     }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
new file mode 100644
index 0000000..1540e14
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JdbcQueryTest extends GridCommonAbstractTest {
+    /** URL. */
+    private final String url = "jdbc:ignite:thin://127.0.0.1?useExperimentalQueryEngine=true";
+
+    /** Nodes count. */
+    private final int nodesCnt = 3;
+
+    /** Connection. */
+    private Connection conn;
+
+    /** Statement. */
+    private Statement stmt;
+
+    /**
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testSimpleQuery() throws SQLException {
+        stmt.execute("CREATE TABLE Person(\"id\" INT, PRIMARY KEY(\"id\"), \"name\" VARCHAR)");
+
+        doSleep(1000);
+
+        stmt.executeUpdate("INSERT INTO Person VALUES (10, 'Name')");
+        try (ResultSet rs = stmt.executeQuery("select p.*, (1+1) as synthetic from Person p")) {
+            assertTrue(rs.next());
+            assertEquals(10, rs.getInt(1));
+            assertEquals("Name", rs.getString(2));
+            assertEquals(2, rs.getInt(3));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(nodesCnt);
+        conn = DriverManager.getConnection(url);
+        conn.setSchema("PUBLIC");
+        stmt = conn.createStatement();
+
+        assert stmt != null;
+        assert !stmt.isClosed();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (stmt != null && !stmt.isClosed()) {
+            stmt.close();
+
+            assert stmt.isClosed();
+        }
+
+        conn.close();
+
+        assert stmt.isClosed();
+        assert conn.isClosed();
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index ebcae81..705cf05 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.ContinuousExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.OutboxTest;
+import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -34,7 +35,8 @@ import org.junit.runners.Suite;
     OutboxTest.class,
     ExecutionTest.class,
     ContinuousExecutionTest.class,
-    CalciteQueryProcessorTest.class
+    CalciteQueryProcessorTest.class,
+    JdbcQueryTest.class
 })
 public class IgniteCalciteTestSuite {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
index 31d9519..442cda6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
@@ -503,4 +503,16 @@ public interface ConnectionProperties {
      * @param connTimeout Connection timeout in milliseconds.
      */
     public void setConnectionTimeout(@Nullable Integer connTimeout) throws SQLException;
+
+    /**
+     * @return {@code True} if experimental query engine is enabled for a connection.
+     */
+    public boolean isUseExperimentalQueryEngine();
+
+    /**
+     * Sets use experimental query engine flag.
+     *
+     * @param useExperimentalQueryEngine {@code True} if experimental query engine is enabled for a connection.
+     */
+    public void setUseExperimentalQueryEngine(boolean useExperimentalQueryEngine);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index 7b14aec..db5d4b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -229,6 +229,10 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
             " Zero means there is no limits.",
         0, false, 0, Integer.MAX_VALUE);
 
+    /** Whether an experimental SQL engine enabled for a connection. */
+    private BooleanProperty useExperimentalQueryEngine = new BooleanProperty("useExperimentalQueryEngine",
+        "Enables experimental query engine.", false, false);
+
     /** Properties array. */
     private final ConnectionProperty [] propsArray = {
         distributedJoins, enforceJoinOrder, collocated, replicatedOnly, autoCloseServerCursor,
@@ -244,7 +248,8 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
             partitionAwarenessSQLCacheSize,
             partitionAwarenessPartDistributionsCacheSize,
         qryTimeout,
-        connTimeout
+        connTimeout,
+        useExperimentalQueryEngine
     };
 
     /** {@inheritDoc} */
@@ -623,6 +628,16 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa
         connTimeout.setValue(timeout);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isUseExperimentalQueryEngine() {
+        return useExperimentalQueryEngine.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setUseExperimentalQueryEngine(boolean useExperimentalQueryEngine) {
+        this.useExperimentalQueryEngine.setValue(useExperimentalQueryEngine);
+    }
+
     /**
      * @param url URL connection.
      * @param props Environment properties.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 8711182..680d7a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -82,8 +82,11 @@ public class JdbcThinTcpIo {
     /** Version 2.8.0. */
     private static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0);
 
+    /** Version 2.9.0. */
+    private static final ClientListenerProtocolVersion VER_2_9_0 = ClientListenerProtocolVersion.create(2, 9, 0);
+
     /** Current version. */
-    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0;
+    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_9_0;
 
     /** Initial output stream capacity for handshake. */
     private static final int HANDSHAKE_MSG_SIZE = 13;
@@ -254,6 +257,9 @@ public class JdbcThinTcpIo {
             JdbcUtils.writeNullableInteger(writer, connProps.getUpdateBatchSize());
         }
 
+        if (ver.compareTo(VER_2_9_0) >= 0)
+            writer.writeBoolean(connProps.isUseExperimentalQueryEngine());
+
         if (!F.isEmpty(connProps.getUsername())) {
             assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 02cb107..b5908cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -186,7 +186,7 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T
      * @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
      * {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
      */
-    public boolean isQuery() {
+    @Override public boolean isQuery() {
         return isQry;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
index 828c650..bda2f8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.sql.ResultSet;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -39,6 +40,12 @@ public interface QueryCursorEx<T> extends QueryCursor<T> {
     public List<GridQueryFieldMetadata> fieldsMeta();
 
     /**
+     * @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
+     * {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
+     */
+    public boolean isQuery();
+
+    /**
      * Query value consumer.
      */
     public static interface Consumer<T> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 7ce21ba..db8fb4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -64,8 +64,11 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
     /** Version 2.8.0: adds query id in order to implement cancel feature, partition awareness support: IEP-23.*/
     static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0);
 
+    /** Version 2.8.0: adds experimental query engine support */
+    static final ClientListenerProtocolVersion VER_2_9_0 = ClientListenerProtocolVersion.create(2, 9, 0);
+
     /** Current version. */
-    public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0;
+    public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_9_0;
 
     /** Supported versions. */
     private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
@@ -144,6 +147,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
 
         boolean lazyExec = false;
         boolean skipReducerOnUpdate = false;
+        boolean useExperimentalQueryEngine = false;
 
         NestedTxMode nestedTxMode = NestedTxMode.DEFAULT;
         AuthorizationContext actx = null;
@@ -176,6 +180,9 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
             updateBatchSize = JdbcUtils.readNullableInteger(reader);
         }
 
+        if (ver.compareTo(VER_2_9_0) >= 0)
+            useExperimentalQueryEngine = reader.readBoolean();
+
         if (ver.compareTo(VER_2_5_0) >= 0) {
             String user = null;
             String passwd = null;
@@ -209,7 +216,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte
         };
 
         handler = new JdbcRequestHandler(busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder,
-            collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, nestedTxMode,
+            collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, useExperimentalQueryEngine, nestedTxMode,
             dataPageScanEnabled, updateBatchSize, actx, ver, this);
 
         handler.start();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
index cd566ed..ef6c8ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
@@ -21,7 +21,7 @@ import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 
 /**
@@ -38,10 +38,10 @@ class JdbcQueryCursor extends JdbcCursor {
     private long fetched;
 
     /** Query result rows. */
-    private final QueryCursorImpl<List<Object>> cur;
+    private final QueryCursorEx<List<?>> cur;
 
     /** Query results iterator. */
-    private Iterator<List<Object>> iter;
+    private Iterator<List<?>> iter;
 
     /**
      * @param pageSize Fetch size.
@@ -49,7 +49,7 @@ class JdbcQueryCursor extends JdbcCursor {
      * @param cur Query cursor.
      * @param reqId Id of the request that created given cursor.
      */
-    JdbcQueryCursor(int pageSize, int maxRows, QueryCursorImpl<List<Object>> cur, long reqId) {
+    JdbcQueryCursor(int pageSize, int maxRows, QueryCursorEx<List<?>> cur, long reqId) {
         super(reqId);
 
         this.pageSize = pageSize;
@@ -73,7 +73,7 @@ class JdbcQueryCursor extends JdbcCursor {
         List<List<Object>> items = new ArrayList<>(fetchSize);
 
         for (int i = 0; i < fetchSize && iter.hasNext(); i++) {
-            items.add(iter.next());
+            items.add((List<Object>)iter.next());
 
             fetched++;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 1510210..fea59b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
@@ -52,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
@@ -61,6 +63,8 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.NestedTxMode;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.QueryEngine;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
@@ -161,6 +165,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
     /** Register that keeps non-cancelled requests. */
     private Map<Long, JdbcQueryDescriptor> reqRegister = new HashMap<>();
 
+    /** Experimental query engine. */
+    private QueryEngine experimentalQueryEngine;
+
     /**
      * Constructor.
      * @param busyLock Shutdown latch.
@@ -173,6 +180,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
      * @param autoCloseCursors Flag to automatically close server cursors.
      * @param lazy Lazy query execution flag.
      * @param skipReducerOnUpdate Skip reducer on update flag.
+     * @param useExperimentalQueryEngine Enable experimental query engine.
      * @param dataPageScanEnabled Enable scan data page mode.
      * @param updateBatchSize Size of internal batch for DML queries.
      * @param actx Authentication context.
@@ -190,6 +198,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         boolean autoCloseCursors,
         boolean lazy,
         boolean skipReducerOnUpdate,
+        boolean useExperimentalQueryEngine,
         NestedTxMode nestedTxMode,
         @Nullable Boolean dataPageScanEnabled,
         @Nullable Integer updateBatchSize,
@@ -228,6 +237,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         this.protocolVer = protocolVer;
         this.actx = actx;
 
+        if (useExperimentalQueryEngine) {
+            for (GridComponent cmp : connCtx.kernalContext().components()) {
+                if (!(cmp instanceof QueryEngine))
+                    continue;
+
+                experimentalQueryEngine = (QueryEngine) cmp;
+
+                break;
+            }
+        }
+
         log = connCtx.kernalContext().log(getClass());
 
         // TODO IGNITE-9484 Do not create worker if there is a possibility to unbind TX from threads.
@@ -612,8 +632,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
             qry.setSchema(schemaName);
 
-            List<FieldsQueryCursor<List<?>>> results = connCtx.kernalContext().query().querySqlFields(null, qry,
-                cliCtx, true, protocolVer.compareTo(VER_2_3_0) < 0, cancel);
+            List<FieldsQueryCursor<List<?>>> results = querySqlFields(qry, cancel);
 
             FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
 
@@ -633,7 +652,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
             if (results.size() == 1) {
                 JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(),
-                    (QueryCursorImpl)fieldsCur, req.requestId());
+                    (QueryCursorEx<List<?>>)fieldsCur, req.requestId());
 
                 jdbcCursors.put(cur.cursorId(), cur);
 
@@ -641,7 +660,10 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
                 JdbcQueryExecuteResult res;
 
-                PartitionResult partRes = ((QueryCursorImpl<List<?>>)fieldsCur).partitionResult();
+                PartitionResult partRes = null;
+
+                if (fieldsCur instanceof QueryCursorImpl)
+                    partRes = ((QueryCursorImpl<List<?>>)fieldsCur).partitionResult();
 
                 if (cur.isQuery())
                     res = new JdbcQueryExecuteResult(cur.cursorId(), cur.fetchRows(), !cur.hasNext(),
@@ -678,7 +700,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
                 boolean last = true;
 
                 for (FieldsQueryCursor<List<?>> c : results) {
-                    QueryCursorImpl qryCur = (QueryCursorImpl)c;
+                    QueryCursorEx<List<?>> qryCur = (QueryCursorEx<List<?>>)c;
 
                     JdbcResultInfo jdbcRes;
 
@@ -723,6 +745,21 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         }
     }
 
+    /** */
+    private List<FieldsQueryCursor<List<?>>> querySqlFields(SqlFieldsQueryEx qry, GridQueryCancel cancel) {
+        if (experimentalQueryEngine != null) {
+            try {
+                return experimentalQueryEngine.query(QueryContext.of(qry, cancel), qry.getSchema(), qry.getSql(), qry.getArgs());
+            }
+            catch (IgniteSQLException e) {
+                U.warn(log, "Failed to execute SQL query using experimental engine. [qry=" + qry + ']', e);
+            }
+        }
+
+        return connCtx.kernalContext().query().querySqlFields(null, qry,
+            cliCtx, true, protocolVer.compareTo(VER_2_3_0) < 0, cancel);
+    }
+
     /**
      * {@link JdbcQueryCloseRequest} command handler.
      *
@@ -1011,7 +1048,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
                 if (cur instanceof BulkLoadContextCursor)
                     throw new IgniteSQLException("COPY command cannot be executed in batch mode.");
 
-                assert !((QueryCursorImpl)cur).isQuery();
+                assert !((QueryCursorEx)cur).isQuery();
 
                 Iterator<List<?>> it = cur.iterator();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
index 4e8cd75..67dedc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
@@ -168,6 +168,10 @@ public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
                         @Override public List<GridQueryFieldMetadata> fieldsMeta() {
                             return null;
                         }
+
+                        @Override public boolean isQuery() {
+                            return false;
+                        }
                     }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
             }
             catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java
index 1abf7d6..a9dbe02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java
@@ -50,7 +50,7 @@ public final class QueryContext {
      * @param params Context parameters.
      * @return Query context.
      */
-    public static QueryContext of(String schemaName, Object... params) {
+    public static QueryContext of(Object... params) {
         return !F.isEmpty(params) ? new QueryContext(build(null, params).toArray()) : new QueryContext(EMPTY);
     }