You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2021/11/23 15:44:05 UTC

[ignite-3] branch main updated: IGNITE-15838 Integrate plugin system in current SQL engine (#464)

This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4572913  IGNITE-15838 Integrate plugin system in current SQL engine (#464)
4572913 is described below

commit 45729130397062c188dc9bc2558d5014f0729d0e
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Tue Nov 23 18:44:00 2021 +0300

    IGNITE-15838 Integrate plugin system in current SQL engine (#464)
---
 .../query/calcite/SqlQueryProcessor.java           |  62 ++++-
 .../query/calcite/exec/ExecutionContext.java       |  17 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |  18 +-
 .../query/calcite/exec/LogicalRelImplementor.java  |  24 +-
 .../calcite/extension/ExternalConvention.java      |  59 ++++
 .../query/calcite/extension/SqlExtension.java      |  47 ++--
 .../calcite/metadata/IgniteMdFragmentMapping.java  |  30 +-
 .../processors/query/calcite/prepare/Cloner.java   | 214 +--------------
 .../query/calcite/prepare/IgniteRelShuttle.java    |   6 +
 .../query/calcite/prepare/MappingQueryContext.java |  18 +-
 .../query/calcite/prepare/PlannerPhase.java        | 302 ++++++++++-----------
 .../query/calcite/prepare/PlanningContext.java     |  11 +
 .../processors/query/calcite/prepare/Splitter.java |   7 +
 .../query/calcite/rel/IgniteGateway.java           | 144 ++++++++++
 .../query/calcite/rel/IgniteRelVisitor.java        |   5 +
 .../query/calcite/schema/SchemaHolderImpl.java     |  45 +++
 .../processors/query/calcite/trait/TraitUtils.java | 105 ++++---
 .../query/calcite/util/BaseQueryContext.java       |  31 ++-
 .../processors/query/calcite/util/Commons.java     |   5 -
 .../processors/query/calcite/util/TypeUtils.java   |   5 +-
 .../query/calcite/planner/PlannerTest.java         |  17 +-
 .../internal/calcite/ItSqlExtensionTest.java       |  69 +++++
 .../internal/calcite/extension/TestExtension.java  | 168 ++++++++++++
 .../calcite/extension/TestFilterConverterRule.java |  59 ++++
 .../internal/calcite/extension/TestPhysFilter.java | 143 ++++++++++
 .../calcite/extension/TestPhysTableScan.java       |  78 ++++++
 .../internal/calcite/extension/TestTableImpl.java  | 101 +++++++
 ...processors.query.calcite.extension.SqlExtension |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   7 +
 29 files changed, 1331 insertions(+), 467 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
index 3b297f2..d23c6d0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
@@ -18,9 +18,12 @@
 package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
@@ -28,6 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionService
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
@@ -52,12 +56,6 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Size of the cache for query plans. */
     public static final int PLAN_CACHE_SIZE = 1024;
 
-    private volatile ExecutionService executionSrvc;
-
-    private volatile MessageService msgSrvc;
-
-    private volatile QueryTaskExecutor taskExecutor;
-
     private final ClusterService clusterSrvc;
 
     private final TableManager tableManager;
@@ -69,7 +67,15 @@ public class SqlQueryProcessor implements QueryProcessor {
     private final QueryPlanCache planCache = new QueryPlanCacheImpl(PLAN_CACHE_SIZE);
 
     /** Event listeners to close. */
-    private final List<Pair<TableEvent, EventListener>> evtLsnrs = new ArrayList<>();
+    private final List<Pair<TableEvent, EventListener<TableEventParameters>>> evtLsnrs = new ArrayList<>();
+
+    private volatile ExecutionService executionSrvc;
+
+    private volatile MessageService msgSrvc;
+
+    private volatile QueryTaskExecutor taskExecutor;
+
+    private volatile Map<String, SqlExtension> extensions;
 
     public SqlQueryProcessor(
             ClusterService clusterSrvc,
@@ -90,6 +96,16 @@ public class SqlQueryProcessor implements QueryProcessor {
                 taskExecutor
         );
 
+        List<SqlExtension> extensionList = new ArrayList<>();
+
+        ServiceLoader<SqlExtension> loader = ServiceLoader.load(SqlExtension.class);
+
+        loader.reload();
+
+        loader.forEach(extensionList::add);
+
+        extensions = extensionList.stream().collect(Collectors.toMap(SqlExtension::name, Function.identity()));
+
         SchemaHolderImpl schemaHolder = new SchemaHolderImpl(planCache::clear);
 
         executionSrvc = new ExecutionServiceImpl<>(
@@ -98,7 +114,8 @@ public class SqlQueryProcessor implements QueryProcessor {
                 planCache,
                 schemaHolder,
                 taskExecutor,
-                ArrayRowHandler.INSTANCE
+                ArrayRowHandler.INSTANCE,
+                extensions
         );
 
         registerTableListener(TableEvent.CREATE, new TableCreatedListener(schemaHolder));
@@ -109,6 +126,8 @@ public class SqlQueryProcessor implements QueryProcessor {
         msgSrvc.start();
         executionSrvc.start();
         planCache.start();
+
+        extensionList.forEach(ext -> ext.init(catalog -> schemaHolder.registerExternalCatalog(ext.name(), catalog)));
     }
 
     private void registerTableListener(TableEvent evt, AbstractTableEventListener lsnr) {
@@ -118,21 +137,34 @@ public class SqlQueryProcessor implements QueryProcessor {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override
     public void stop() throws Exception {
         busyLock.block();
 
-        List<AutoCloseable> toClose = new ArrayList<>(Arrays.asList(
+        List<AutoCloseable> toClose = new ArrayList<>();
+
+        Map<String, SqlExtension> extensions = this.extensions;
+        if (extensions != null) {
+            toClose.addAll(
+                    extensions.values().stream()
+                            .map(ext -> (AutoCloseable) ext::stop)
+                            .collect(Collectors.toList())
+            );
+        }
+
+        Stream<AutoCloseable> closableComponents = Stream.of(
                 executionSrvc::stop,
                 msgSrvc::stop,
                 taskExecutor::stop,
                 planCache::stop
-        ));
+        );
+
+        Stream<AutoCloseable> closableListeners = evtLsnrs.stream()
+                .map((p) -> () -> tableManager.removeListener(p.left, p.right));
 
-        toClose.addAll(evtLsnrs.stream()
-                .map((p) -> (AutoCloseable) () -> tableManager.removeListener(p.left, p.right))
-                .collect(Collectors.toList()));
+        toClose.addAll(
+                Stream.concat(closableComponents, closableListeners).collect(Collectors.toList())
+        );
 
         IgniteUtils.closeAll(toClose);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 4e1cdca..b721787 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -32,6 +32,7 @@ import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Runtime context allowing access to the tables in a database.
@@ -76,8 +78,8 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
     private final AtomicBoolean cancelFlag = new AtomicBoolean();
 
     /**
-     * Need to store timestamp, since SQL standard says that functions such as CURRENT_TIMESTAMP return the same value
-     * throughout the query.
+     * Need to store timestamp, since SQL standard says that functions such as CURRENT_TIMESTAMP return the same value throughout the
+     * query.
      */
     private final long startTs;
 
@@ -87,7 +89,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
      * Constructor.
      *
      * @param executor     Task executor.
-     * @param qctx          Base query context.
+     * @param qctx         Base query context.
      * @param qryId        Query ID.
      * @param fragmentDesc Partitions information.
      * @param handler      Row handler.
@@ -210,6 +212,15 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
         return topVer;
     }
 
+    /**
+     * Get an extensions by it's name.
+     *
+     * @return An extensions or {@code null} if there is no extension with given name.
+     */
+    public @Nullable SqlExtension extension(String name) {
+        return qctx.extension(name);
+    }
+
     /** {@inheritDoc} */
     @Override
     public SchemaPlus getRootSchema() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 50a86a4..7974fcd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
 import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
@@ -142,6 +143,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
 
     private final DdlSqlToCommandConverter ddlConverter;
 
+    private final Map<String, SqlExtension> extensions;
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -152,13 +155,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
             QueryPlanCache planCache,
             SchemaHolder schemaHolder,
             QueryTaskExecutor taskExecutor,
-            RowHandler<RowT> handler
+            RowHandler<RowT> handler,
+            Map<String, SqlExtension> extensions
     ) {
         this.topSrvc = topSrvc;
         this.handler = handler;
         this.msgSrvc = msgSrvc;
         this.schemaHolder = schemaHolder;
         this.taskExecutor = taskExecutor;
+        this.extensions = extensions;
 
         locNodeId = topSrvc.localMember().id();
         qryPlanCache = planCache;
@@ -227,7 +232,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
             BaseQueryContext qctx,
             Object[] params
     ) {
-        plan.init(mappingSrvc, new MappingQueryContext(locNodeId, topologyVersion()));
+        plan.init(mappingSrvc, new MappingQueryContext(qctx, locNodeId, topologyVersion()));
 
         List<Fragment> fragments = plan.fragments();
 
@@ -365,6 +370,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
                                 .build()
                 )
                 .logger(LOG)
+                .extensions(extensions)
                 .build();
     }
 
@@ -383,9 +389,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
         ValidationResult validated = planner.validateAndGetTypeMetadata(sqlNode);
 
         sqlNode = validated.sqlNode();
-        
+
         IgniteRel igniteRel = optimize(sqlNode, planner);
-        
+
         // Split query plan to query fragments.
         List<Fragment> fragments = new Splitter().go(igniteRel);
 
@@ -444,7 +450,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
 
         // Convert to Relational operators graph
         IgniteRel igniteRel = optimize(sqlNode, planner);
-        
+
         // Split query plan to query fragments.
         List<Fragment> fragments = new Splitter().go(igniteRel);
 
@@ -471,7 +477,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
 
         // Convert to Relational operators graph
         IgniteRel igniteRel = optimize(sql, planner);
-        
+
         String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
 
         return new ExplainPlan(plan, explainFieldsMetadata(ctx));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 6286559..e214429 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
  * Implements a query plan.
@@ -317,6 +319,9 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
         ImmutableBitSet requiredColumns = rel.requiredColumns();
 
         InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
+
+        assert tbl != null;
+
         IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
@@ -339,6 +344,18 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
 
     /** {@inheritDoc} */
     @Override
+    public Node<RowT> visit(IgniteGateway rel) {
+        var extension = ctx.extension(rel.extensionName());
+
+        if (extension == null) {
+            throw new IgniteInternalException("Unknown SQL extension \"" + rel.extensionName() + "\"");
+        }
+
+        return extension.<RowT>implementor().implement(ctx, (IgniteRel) rel.getInput());
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public Node<RowT> visit(IgniteValues rel) {
         List<RexLiteral> vals = Commons.flat(Commons.cast(rel.getTuples()));
 
@@ -493,7 +510,11 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
             case INSERT:
             case UPDATE:
             case DELETE:
-                ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), rel.getTable().unwrap(InternalIgniteTable.class),
+                InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
+
+                assert tbl != null;
+
+                ModifyNode<RowT> node = new ModifyNode<>(ctx, rel.getRowType(), tbl,
                         rel.getOperation(), rel.getUpdateColumnList());
 
                 Node<RowT> input = visit(rel.getInput());
@@ -697,6 +718,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
         return visit((IgniteRel) rel);
     }
 
+    @SuppressWarnings("unchecked")
     public <T extends Node<RowT>> T go(IgniteRel rel) {
         return (T) visit(rel);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/ExternalConvention.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/ExternalConvention.java
new file mode 100644
index 0000000..6580ee0
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/ExternalConvention.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.extension;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+
+/**
+ * A handy class to introduce a new convention to the engine via Extension API.
+ */
+public class ExternalConvention extends Convention.Impl {
+    /**
+     * Constructor.
+     *
+     * @param name     The name of the convention. Must be equal to {@link SqlExtension#name()}.
+     * @param relClass The interface (usually, but could be a class as well) every relation of given convention have to implement.
+     */
+    public ExternalConvention(String name, Class<? extends IgniteRel> relClass) {
+        super(name, relClass);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelNode enforce(RelNode rel, RelTraitSet toTraits) {
+        return TraitUtils.enforce(rel, toTraits);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ConventionTraitDef getTraitDef() {
+        return ConventionTraitDef.INSTANCE;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean canConvertConvention(Convention toConvention) {
+        return toConvention == IgniteConvention.INSTANCE;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java
index 7fa84ed..93abdb4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/extension/SqlExtension.java
@@ -20,25 +20,30 @@ package org.apache.ignite.internal.processors.query.calcite.extension;
 import java.util.List;
 import java.util.Set;
 import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.query.calcite.SqlQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Entry point to extend current sql engine with external storage or even custom execution.
+ *
+ * <h3>Extension lifecycle</h3>
+ * All extensions are created in a start phase of {@link SqlQueryProcessor} and initialized after all other components
+ * of {@link SqlQueryProcessor} have been started.
+ *
+ * <p>All extensions are stopped in the very beginning of the stop phase of {@link SqlQueryProcessor}, just before the other components.
  */
 public interface SqlExtension {
     /**
      * Returns the name of the current extension.
      *
      * <p>This name will be used to distinguish between different
-     * extensions. Also the {@link CatalogUpdateListener} will register
-     * provided catalog with the same name.
+     * extensions. Also the {@link CatalogUpdateListener} will register provided catalog with the same name.
      *
      * @return Name of the extension.
      */
@@ -47,29 +52,29 @@ public interface SqlExtension {
     /**
      * Initializes the extension before use.
      *
-     * @param ignite Instance of the current Ignite node.
-     * @param catalogUpdateListener Listener to notify when new table or schema
-     *                              are available. Note: the catalog listener creates
-     *                              copy of the provided catalog, so its not enough
-     *                              to just update {@link ExternalCatalog catalog} or
-     *                              {@link ExternalSchema schema}, the listener should
-     *                              be called explicitly to register changes.
+     * @param catalogUpdateListener Listener to notify when new table or schema are available. Note: the catalog listener creates copy of
+     *                              the provided catalog, so its not enough to just update {@link ExternalCatalog catalog} or {@link
+     *                              ExternalSchema schema}, the listener should be called explicitly to register changes.
      * @see ExternalSchema
      * @see ExternalCatalog
      */
     void init(
-            Ignite ignite,
             CatalogUpdateListener catalogUpdateListener
     );
 
     /**
+     * Stops the extension.
+     */
+    default void stop() {}
+
+    /**
      * Returns a set of optimization rules for given optimization phase.
      *
      * @param phase Current optimization phase.
      * @return Set of rules, or empty set if there are no rules for given phase.
      */
     Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase);
-    
+
     /**
      * Returns an implementor of relations provided by current extension.
      *
@@ -80,7 +85,7 @@ public interface SqlExtension {
      * @see RelImplementor
      */
     <RowT> RelImplementor<RowT> implementor();
-    
+
     /**
      * Returns colocation group for given relational tree.
      *
@@ -91,7 +96,7 @@ public interface SqlExtension {
      * @return Colocation of given relation tree.
      */
     ColocationGroup colocationGroup(IgniteRel node);
-    
+
     /**
      * Implementer to create execution nodes from provided relational nodes.
      *
@@ -108,36 +113,36 @@ public interface SqlExtension {
          * <p>It's guaranteed that the tree will only consist of the relations
          * provided by the current extension.
          *
-         * @param ctx An execution context.
+         * @param ctx  An execution context.
          * @param node A root of the relational tree.
          * @return A root of the resulting execution tree.
          */
         Node<RowT> implement(ExecutionContext<RowT> ctx, IgniteRel node);
     }
-    
+
     /**
      * Represents an external SQL schema that is simply a group different tables.
      */
     interface ExternalSchema {
         /** Returns list of all tables provided by current schema. */
         List<String> tableNames();
-    
+
         /**
          * Returns table by its name.
          *
          * @param name Name of the table.
          * @return The table, or {@code null} if there is no table with given name.
          */
-        @Nullable RelOptTable table(String name);
+        @Nullable IgniteTable table(String name);
     }
-    
+
     /**
      * Represents an external SQL catalog that is simply a group different schemas.
      */
     interface ExternalCatalog {
         /** Returns list of all schemas provided by current catalog. */
         List<String> schemaNames();
-    
+
         /**
          * Returns schema by its name.
          *
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
index 2c3139a..4fd597b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
@@ -33,8 +33,10 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada
 import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.processors.query.calcite.schema.InternalIgnite
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
  * Implementation class for {@link RelMetadataQueryEx#fragmentMapping(RelNode, MappingQueryContext)} method call.
@@ -103,10 +106,9 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
      * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
      *
      * <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the fragment
-     * (which part the parent node is) cannot be executed on any node and additional exchange is needed.
-     * This case we throw {@link NodeMappingException} with an edge, where we need the additional exchange.
-     * After the exchange is put into the fragment and the fragment is split into two ones,
-     * fragment meta information will be recalculated for all fragments.
+     * (which part the parent node is) cannot be executed on any node and additional exchange is needed. This case we throw {@link
+     * NodeMappingException} with an edge, where we need the additional exchange. After the exchange is put into the fragment and the
+     * fragment is split into two ones, fragment meta information will be recalculated for all fragments.
      */
     public FragmentMapping fragmentMapping(BiRel rel, RelMetadataQuery mq, MappingQueryContext ctx) {
         RelNode left = rel.getLeft();
@@ -139,10 +141,9 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
      * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}
      *
      * <p>{@link ColocationMappingException} may be thrown on two children nodes locations merge. This means that the
-     * fragment (which part the parent node is) cannot be executed on any node and additional exchange is needed. This
-     * case we throw {@link NodeMappingException} with an edge, where we need the additional exchange. After the
-     * exchange is put into the fragment and the fragment is split into two ones, fragment meta information will be
-     * recalculated for all fragments.
+     * fragment (which part the parent node is) cannot be executed on any node and additional exchange is needed. This case we throw {@link
+     * NodeMappingException} with an edge, where we need the additional exchange. After the exchange is put into the fragment and the
+     * fragment is split into two ones, fragment meta information will be recalculated for all fragments.
      */
     public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq, MappingQueryContext ctx) {
         FragmentMapping res = null;
@@ -222,6 +223,19 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM
     /**
      * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
      */
+    public FragmentMapping fragmentMapping(IgniteGateway rel, RelMetadataQuery mq, MappingQueryContext ctx) {
+        var extension = ctx.extension(rel.extensionName());
+
+        if (extension == null) {
+            throw new IgniteInternalException("Unknown SQL extension \"" + rel.extensionName() + "\"");
+        }
+
+        return FragmentMapping.create(rel.sourceId(), extension.colocationGroup((IgniteRel) rel.getInput()));
+    }
+
+    /**
+     * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery, MappingQueryContext)}.
+     */
     public FragmentMapping fragmentMapping(IgniteTableFunctionScan rel, RelMetadataQuery mq, MappingQueryContext ctx) {
         return FragmentMapping.create();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 58a8b0f..7891173 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -17,47 +17,18 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
-
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
-import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  * Cloner.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class Cloner implements IgniteRelVisitor<IgniteRel> {
+public class Cloner {
     private final RelOptCluster cluster;
 
     private List<IgniteReceiver> remotes;
@@ -94,182 +65,21 @@ public class Cloner implements IgniteRelVisitor<IgniteRel> {
         return c.visit(r);
     }
 
-    private IgniteReceiver collect(IgniteReceiver receiver) {
-        if (remotes != null) {
-            remotes.add(receiver);
+    private IgniteRel collect(IgniteRel rel) {
+        if (rel instanceof IgniteReceiver && remotes != null) {
+            remotes.add((IgniteReceiver) rel);
         }
 
-        return receiver;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteSender rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteFilter rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteTrimExchange rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteProject rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteTableModify rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteNestedLoopJoin rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
-                visit((IgniteRel) rel.getRight())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
-                visit((IgniteRel) rel.getRight())));
+        return rel;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteMergeJoin rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
-                visit((IgniteRel) rel.getRight())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteIndexScan rel) {
-        return rel.clone(cluster, asList());
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteTableScan rel) {
-        return rel.clone(cluster, asList());
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteValues rel) {
-        return rel.clone(cluster, asList());
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteUnionAll rel) {
-        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteSort rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteTableSpool rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteSortedIndexSpool rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteLimit rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteReceiver rel) {
-        return collect((IgniteReceiver) rel.clone(cluster, asList()));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteExchange rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteSingleHashAggregate rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteMapHashAggregate rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteReduceHashAggregate rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteSingleSortAggregate rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteMapSortAggregate rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteReduceSortAggregate rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteHashIndexSpool rel) {
-        return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteSetOp rel) {
-        return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgniteRel visit(IgniteTableFunctionScan rel) {
-        return rel.clone(cluster, asList());
-    }
-
-    /** {@inheritDoc} */
-    @Override
+    /**
+     * Clones and associates a plan with a new cluster.
+     *
+     * @param rel The head of the relational tree.
+     * @return A new tree.
+     */
     public IgniteRel visit(IgniteRel rel) {
-        return rel.accept(this);
+        return collect(rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0))));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index a728aed..934d4a9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
@@ -217,6 +218,11 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteGateway rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
     @Override
     public IgniteRel visit(IgniteRel rel) {
         return rel.accept(this);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 90f240b..27d8686 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -17,10 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
+import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Query mapping context.
  */
 public class MappingQueryContext {
+    private final BaseQueryContext qctx;
+
     private final String locNodeId;
 
     private final long topVer;
@@ -31,11 +37,21 @@ public class MappingQueryContext {
      * @param locNodeId Local node identifier.
      * @param topVer    Topology version to map.
      */
-    public MappingQueryContext(String locNodeId, long topVer) {
+    public MappingQueryContext(BaseQueryContext qctx, String locNodeId, long topVer) {
+        this.qctx = qctx;
         this.locNodeId = locNodeId;
         this.topVer = topVer;
     }
 
+    /**
+     * Get an extensions by it's name.
+     *
+     * @return An extensions or {@code null} if there is no extension with given name.
+     */
+    public @Nullable SqlExtension extension(String name) {
+        return qctx.extension(name);
+    }
+
     public String localNodeId() {
         return locNodeId;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index 19c553d..64ef4eb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
 import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.cbo;
 import static org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePrograms.hep;
 
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
@@ -69,56 +72,45 @@ import org.apache.ignite.internal.processors.query.calcite.rule.logical.ProjectS
  * Represents a planner phase with its description and a used rule set.
  */
 public enum PlannerPhase {
-    HEP_DECORRELATE("Heuristic phase to decorrelate subqueries") {
+    HEP_DECORRELATE(
+            "Heuristic phase to decorrelate subqueries",
+            CoreRules.FILTER_SUB_QUERY_TO_CORRELATE,
+            CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE,
+            CoreRules.JOIN_SUB_QUERY_TO_CORRELATE
+    ) {
         /** {@inheritDoc} */
         @Override
-        public RuleSet getRules(PlanningContext ctx) {
-            return RuleSets.ofList(
-                    CoreRules.FILTER_SUB_QUERY_TO_CORRELATE,
-                    CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE,
-                CoreRules.JOIN_SUB_QUERY_TO_CORRELATE
-            );
-        }
-
-        /** {@inheritDoc} */
-        @Override public Program getProgram(PlanningContext ctx) {
+        public Program getProgram(PlanningContext ctx) {
             return hep(getRules(ctx));
         }
     },
 
-    HEP_FILTER_PUSH_DOWN("Heuristic phase to push down filters") {
-        /** {@inheritDoc} */
-        @Override public RuleSet getRules(PlanningContext ctx) {
-            return RuleSets.ofList(
-                CoreRules.FILTER_MERGE,
-                CoreRules.FILTER_AGGREGATE_TRANSPOSE,
-                CoreRules.FILTER_SET_OP_TRANSPOSE,
-                CoreRules.JOIN_CONDITION_PUSH,
-                CoreRules.FILTER_INTO_JOIN,
-                CoreRules.FILTER_PROJECT_TRANSPOSE
-            );
-        }
-
+    HEP_FILTER_PUSH_DOWN(
+            "Heuristic phase to push down filters",
+            CoreRules.FILTER_MERGE,
+            CoreRules.FILTER_AGGREGATE_TRANSPOSE,
+            CoreRules.FILTER_SET_OP_TRANSPOSE,
+            CoreRules.JOIN_CONDITION_PUSH,
+            CoreRules.FILTER_INTO_JOIN,
+            CoreRules.FILTER_PROJECT_TRANSPOSE
+    ) {
         /** {@inheritDoc} */
-        @Override public Program getProgram(PlanningContext ctx) {
+        @Override
+        public Program getProgram(PlanningContext ctx) {
             return hep(getRules(ctx));
         }
     },
 
-    HEP_PROJECT_PUSH_DOWN("Heuristic phase to push down and merge projects") {
-        /** {@inheritDoc} */
-        @Override public RuleSet getRules(PlanningContext ctx) {
-            return RuleSets.ofList(
-                ProjectScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
-                ProjectScanMergeRule.INDEX_SCAN_SKIP_CORRELATED,
-
-                CoreRules.JOIN_PUSH_EXPRESSIONS,
-                CoreRules.PROJECT_MERGE,
-                CoreRules.PROJECT_REMOVE,
-                CoreRules.PROJECT_FILTER_TRANSPOSE
-            );
-        }
+    HEP_PROJECT_PUSH_DOWN(
+            "Heuristic phase to push down and merge projects",
+            ProjectScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
+            ProjectScanMergeRule.INDEX_SCAN_SKIP_CORRELATED,
 
+            CoreRules.JOIN_PUSH_EXPRESSIONS,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE,
+            CoreRules.PROJECT_FILTER_TRANSPOSE
+    ) {
         /** {@inheritDoc} */
         @Override
         public Program getProgram(PlanningContext ctx) {
@@ -126,117 +118,111 @@ public enum PlannerPhase {
         }
     },
 
-    OPTIMIZATION("Main optimization phase") {
-        /** {@inheritDoc} */
-        @Override
-        public RuleSet getRules(PlanningContext ctx) {
-            return ctx.rules(
-                    RuleSets.ofList(
-                            FilterMergeRule.Config.DEFAULT
-                                    .withOperandFor(LogicalFilter.class).toRule(),
-
-                            JoinPushThroughJoinRule.Config.LEFT
-                                    .withOperandFor(LogicalJoin.class).toRule(),
-
-                            JoinPushThroughJoinRule.Config.RIGHT
-                                    .withOperandFor(LogicalJoin.class).toRule(),
-
-                            JoinPushExpressionsRule.Config.DEFAULT
-                                    .withOperandFor(LogicalJoin.class).toRule(),
-
-                            JoinConditionPushRule.Config.DEFAULT
-                                    .withOperandSupplier(b -> b.operand(LogicalJoin.class)
-                                            .anyInputs()).toRule(),
-
-                            FilterIntoJoinRule.Config.DEFAULT
-                                    .withOperandSupplier(b0 ->
-                                            b0.operand(LogicalFilter.class).oneInput(b1 ->
-                                                    b1.operand(LogicalJoin.class).anyInputs())).toRule(),
-
-                            FilterProjectTransposeRule.Config.DEFAULT
-                                    .withOperandFor(LogicalFilter.class, f -> true, LogicalProject.class, p -> true).toRule(),
-
-                            ProjectFilterTransposeRule.Config.DEFAULT
-                                    .withOperandFor(LogicalProject.class, LogicalFilter.class).toRule(),
-
-                            ProjectMergeRule.Config.DEFAULT
-                                    .withOperandFor(LogicalProject.class).toRule(),
-
-                            ProjectRemoveRule.Config.DEFAULT
-                                    .withOperandSupplier(b ->
-                                            b.operand(LogicalProject.class)
-                                                    .predicate(ProjectRemoveRule::isTrivial)
-                                                    .anyInputs()).toRule(),
-
-                            AggregateMergeRule.Config.DEFAULT
-                                    .withOperandSupplier(b0 ->
-                                            b0.operand(LogicalAggregate.class)
-                                                    .oneInput(b1 ->
-                                                            b1.operand(LogicalAggregate.class)
-                                                                    .predicate(Aggregate::isSimple)
-                                                                    .anyInputs())).toRule(),
-
-                            AggregateExpandDistinctAggregatesRule.Config.JOIN.toRule(),
-
-                            SortRemoveRule.Config.DEFAULT
-                                    .withOperandSupplier(b ->
-                                            b.operand(LogicalSort.class)
-                                                    .anyInputs()).toRule(),
-
-                            CoreRules.UNION_MERGE,
-                            CoreRules.MINUS_MERGE,
-                            CoreRules.INTERSECT_MERGE,
-                            CoreRules.UNION_REMOVE,
-                            CoreRules.JOIN_COMMUTE,
-                            CoreRules.AGGREGATE_REMOVE,
-                            CoreRules.JOIN_COMMUTE_OUTER,
-
-                            // Useful of this rule is not clear now.
-                            // CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
-
-                            PruneEmptyRules.SortFetchZeroRuleConfig.EMPTY
-                                    .withOperandSupplier(b ->
-                                            b.operand(LogicalSort.class).anyInputs())
-                                    .withDescription("PruneSortLimit0")
-                                    .as(PruneEmptyRules.SortFetchZeroRuleConfig.class)
-                                    .toRule(),
-
-                            ExposeIndexRule.INSTANCE,
-                            ProjectScanMergeRule.TABLE_SCAN,
-                            ProjectScanMergeRule.INDEX_SCAN,
-                            FilterSpoolMergeToSortedIndexSpoolRule.INSTANCE,
-                            FilterSpoolMergeToHashIndexSpoolRule.INSTANCE,
-                            FilterScanMergeRule.TABLE_SCAN,
-                            FilterScanMergeRule.INDEX_SCAN,
-
-                            LogicalOrToUnionRule.INSTANCE,
-
-                            CorrelatedNestedLoopJoinRule.INSTANCE,
-                            CorrelateToNestedLoopRule.INSTANCE,
-                            NestedLoopJoinConverterRule.INSTANCE,
-                            MergeJoinConverterRule.INSTANCE,
-
-                            ValuesConverterRule.INSTANCE,
-                            LogicalScanConverterRule.INDEX_SCAN,
-                            LogicalScanConverterRule.TABLE_SCAN,
-                            HashAggregateConverterRule.SINGLE,
-                            HashAggregateConverterRule.MAP_REDUCE,
-                            SortAggregateConverterRule.SINGLE,
-                            SortAggregateConverterRule.MAP_REDUCE,
-                            SetOpConverterRule.SINGLE_MINUS,
-                            SetOpConverterRule.MAP_REDUCE_MINUS,
-                            SetOpConverterRule.SINGLE_INTERSECT,
-                            SetOpConverterRule.MAP_REDUCE_INTERSECT,
-                            ProjectConverterRule.INSTANCE,
-                            FilterConverterRule.INSTANCE,
-                            TableModifyConverterRule.INSTANCE,
-                            UnionConverterRule.INSTANCE,
-                            SortConverterRule.INSTANCE,
-                            TableFunctionScanConverterRule.INSTANCE
-                    )
-            );
-        }
-
+    OPTIMIZATION(
+            "Main optimization phase",
+            FilterMergeRule.Config.DEFAULT
+                    .withOperandFor(LogicalFilter.class).toRule(),
+
+            JoinPushThroughJoinRule.Config.LEFT
+                    .withOperandFor(LogicalJoin.class).toRule(),
+
+            JoinPushThroughJoinRule.Config.RIGHT
+                    .withOperandFor(LogicalJoin.class).toRule(),
+
+            JoinPushExpressionsRule.Config.DEFAULT
+                    .withOperandFor(LogicalJoin.class).toRule(),
+
+            JoinConditionPushRule.Config.DEFAULT
+                    .withOperandSupplier(b -> b.operand(LogicalJoin.class)
+                            .anyInputs()).toRule(),
+
+            FilterIntoJoinRule.Config.DEFAULT
+                    .withOperandSupplier(b0 ->
+                            b0.operand(LogicalFilter.class).oneInput(b1 ->
+                                    b1.operand(LogicalJoin.class).anyInputs())).toRule(),
+
+            FilterProjectTransposeRule.Config.DEFAULT
+                    .withOperandFor(LogicalFilter.class, f -> true, LogicalProject.class, p -> true)
+                    .toRule(),
+
+            ProjectFilterTransposeRule.Config.DEFAULT
+                    .withOperandFor(LogicalProject.class, LogicalFilter.class).toRule(),
+
+            ProjectMergeRule.Config.DEFAULT
+                    .withOperandFor(LogicalProject.class).toRule(),
+
+            ProjectRemoveRule.Config.DEFAULT
+                    .withOperandSupplier(b ->
+                            b.operand(LogicalProject.class)
+                                    .predicate(ProjectRemoveRule::isTrivial)
+                                    .anyInputs()).toRule(),
+
+            AggregateMergeRule.Config.DEFAULT
+                    .withOperandSupplier(b0 ->
+                            b0.operand(LogicalAggregate.class)
+                                    .oneInput(b1 ->
+                                            b1.operand(LogicalAggregate.class)
+                                                    .predicate(Aggregate::isSimple)
+                                                    .anyInputs())).toRule(),
+
+            AggregateExpandDistinctAggregatesRule.Config.JOIN.toRule(),
+
+            SortRemoveRule.Config.DEFAULT
+                    .withOperandSupplier(b ->
+                            b.operand(LogicalSort.class)
+                                    .anyInputs()).toRule(),
+
+            CoreRules.UNION_MERGE,
+            CoreRules.MINUS_MERGE,
+            CoreRules.INTERSECT_MERGE,
+            CoreRules.UNION_REMOVE,
+            CoreRules.JOIN_COMMUTE,
+            CoreRules.AGGREGATE_REMOVE,
+            CoreRules.JOIN_COMMUTE_OUTER,
+
+            // Useful of this rule is not clear now.
+            // CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
+
+            PruneEmptyRules.SortFetchZeroRuleConfig.EMPTY
+                    .withOperandSupplier(b ->
+                            b.operand(LogicalSort.class).anyInputs())
+                    .withDescription("PruneSortLimit0")
+                    .as(PruneEmptyRules.SortFetchZeroRuleConfig.class)
+                    .toRule(),
+
+            ExposeIndexRule.INSTANCE,
+            ProjectScanMergeRule.TABLE_SCAN,
+            ProjectScanMergeRule.INDEX_SCAN,
+            FilterSpoolMergeToSortedIndexSpoolRule.INSTANCE,
+            FilterSpoolMergeToHashIndexSpoolRule.INSTANCE,
+            FilterScanMergeRule.TABLE_SCAN,
+            FilterScanMergeRule.INDEX_SCAN,
+
+            LogicalOrToUnionRule.INSTANCE,
+
+            CorrelatedNestedLoopJoinRule.INSTANCE,
+            CorrelateToNestedLoopRule.INSTANCE,
+            NestedLoopJoinConverterRule.INSTANCE,
+            MergeJoinConverterRule.INSTANCE,
+
+            ValuesConverterRule.INSTANCE,
+            LogicalScanConverterRule.INDEX_SCAN,
+            LogicalScanConverterRule.TABLE_SCAN,
+            HashAggregateConverterRule.SINGLE,
+            HashAggregateConverterRule.MAP_REDUCE,
+            SortAggregateConverterRule.SINGLE,
+            SortAggregateConverterRule.MAP_REDUCE,
+            SetOpConverterRule.SINGLE_MINUS,
+            SetOpConverterRule.MAP_REDUCE_MINUS,
+            SetOpConverterRule.SINGLE_INTERSECT,
+            SetOpConverterRule.MAP_REDUCE_INTERSECT,
+            ProjectConverterRule.INSTANCE,
+            FilterConverterRule.INSTANCE,
+            TableModifyConverterRule.INSTANCE,
+            UnionConverterRule.INSTANCE,
+            SortConverterRule.INSTANCE,
+            TableFunctionScanConverterRule.INSTANCE
+    ) {
         /** {@inheritDoc} */
         @Override
         public Program getProgram(PlanningContext ctx) {
@@ -246,11 +232,17 @@ public enum PlannerPhase {
 
     public final String description;
 
+    private final List<RelOptRule> rules;
+
     /**
-     * Set phase description.
+     * Constructor.
+     *
+     * @param description A description of the phase.
+     * @param rules A list of rules associated with the current phase.
      */
-    PlannerPhase(String description) {
+    PlannerPhase(String description, RelOptRule... rules) {
         this.description = description;
+        this.rules = List.of(rules);
     }
 
     /**
@@ -259,7 +251,13 @@ public enum PlannerPhase {
      * @param ctx Planner context.
      * @return Rule set.
      */
-    public abstract RuleSet getRules(PlanningContext ctx);
+    public RuleSet getRules(PlanningContext ctx) {
+        List<RelOptRule> rules = new ArrayList<>(this.rules);
+
+        ctx.extensions().forEach(p -> rules.addAll(p.getOptimizerRules(this)));
+
+        return ctx.rules(RuleSets.ofList(rules));
+    }
 
     /**
      * Returns a program, calculated on the basis of query, planner context planner phase and rules set.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index 8e6be74..4fb4504 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import java.util.List;
 import java.util.function.Function;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
@@ -27,6 +28,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.RuleSet;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
 import org.jetbrains.annotations.NotNull;
@@ -66,6 +68,15 @@ public final class PlanningContext implements Context {
     }
 
     /**
+     * Get list of extensions.
+     *
+     * @return List of extensions.
+     */
+    public List<SqlExtension> extensions() {
+        return unwrap(BaseQueryContext.class).extensions();
+    }
+
+    /**
      * Get query.
      */
     public String query() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index 6bcbd68..738d86c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -104,6 +105,12 @@ public class Splitter extends IgniteRelShuttle {
         return rel.clone(IdGenerator.nextId());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel visit(IgniteGateway rel) {
+        return rel.clone(IdGenerator.nextId());
+    }
+
     private static class FragmentProto {
         private final long id;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteGateway.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteGateway.java
new file mode 100644
index 0000000..aa4b7d5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteGateway.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+
+/**
+ * A gateway node connecting relational trees with different conventions.
+ */
+public class IgniteGateway extends SingleRel implements SourceAwareIgniteRel {
+    private static final String EXTENSION_NAME_TERM = "name";
+
+    private final String extensionName;
+
+    private final long sourceId;
+
+    /**
+     * Constructor.
+     *
+     * @param extensionName A name of the extension the input node belongs to.
+     * @param cluster       Cluster this node belongs to.
+     * @param traits        A set of the traits this node satisfy.
+     * @param input         An input relation.
+     */
+    public IgniteGateway(String extensionName, RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+        this(extensionName, -1L, cluster, traits, input);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param extensionName A name of the extension the input node belongs to.
+     * @param sourceId      An id of the source this gateway belongs to.
+     * @param cluster       Cluster this node belongs to.
+     * @param traits        A set of the traits this node satisfy.
+     * @param input         An input relation.
+     */
+    private IgniteGateway(String extensionName, long sourceId, RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+        super(cluster, traits, input);
+
+        this.sourceId = sourceId;
+        this.extensionName = extensionName;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param input Context to recover this relation from.
+     */
+    public IgniteGateway(RelInput input) {
+        super(input.getCluster(), input.getTraitSet().replace(IgniteConvention.INSTANCE), input.getInput());
+
+        extensionName = input.getString(EXTENSION_NAME_TERM);
+
+        Object srcIdObj = input.get("sourceId");
+        if (srcIdObj != null) {
+            sourceId = ((Number) srcIdObj).longValue();
+        } else {
+            sourceId = -1;
+        }
+    }
+
+    /**
+     * Returns the name of the extension the input node belongs to.
+     *
+     * @return The name of the extension.
+     */
+    public String extensionName() {
+        return extensionName;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw)
+                .item(EXTENSION_NAME_TERM, extensionName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isEnforcer() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return planner.getCostFactory().makeZeroCost();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteGateway(extensionName, cluster, getTraitSet(), sole(inputs));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(long sourceId) {
+        return new IgniteGateway(extensionName, sourceId, getCluster(), getTraitSet(), getInput());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteGateway(extensionName, getCluster(), traitSet, sole(inputs));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long sourceId() {
+        return sourceId;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 457465b..a4203e0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -165,6 +165,11 @@ public interface IgniteRelVisitor<T> {
     T visit(IgniteTableFunctionScan rel);
 
     /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+     */
+    T visit(IgniteGateway rel);
+
+    /**
      * Visits a relational node and calculates a result on the basis of node meta information.
      *
      * @param rel Relational node.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 6a9dfab..9d9674d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -22,8 +22,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension.ExternalCatalog;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension.ExternalSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.TableImpl;
@@ -35,6 +40,8 @@ import org.jetbrains.annotations.Nullable;
 public class SchemaHolderImpl implements SchemaHolder {
     private final Map<String, IgniteSchema> igniteSchemas = new HashMap<>();
 
+    private final Map<String, Schema> externalCatalogs = new HashMap<>();
+
     private final Runnable onSchemaUpdatedCallback;
 
     private volatile SchemaPlus calciteSchema;
@@ -57,6 +64,28 @@ public class SchemaHolderImpl implements SchemaHolder {
         return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
     }
 
+    /**
+     * Register an external catalog under given name.
+     *
+     * @param name Name of the external catalog.
+     * @param catalog Catalog to register.
+     */
+    public synchronized void registerExternalCatalog(String name, ExternalCatalog catalog) {
+        catalog.schemaNames().forEach(schemaName -> registerExternalSchema(name, schemaName, catalog.schema(schemaName)));
+
+        rebuild();
+    }
+
+    private void registerExternalSchema(String catalogName, String schemaName, ExternalSchema schema) {
+        Map<String, Table> tables = new HashMap<>();
+
+        schema.tableNames().forEach(name -> tables.put(name, schema.table(name)));
+
+        SchemaPlus schemaPlus = (SchemaPlus) externalCatalogs.computeIfAbsent(catalogName, n -> Frameworks.createRootSchema(false));
+
+        schemaPlus.add(schemaName, new ExternalSchemaHolder(tables));
+    }
+
     public synchronized void onSchemaCreated(String schemaName) {
         igniteSchemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
         rebuild();
@@ -126,8 +155,12 @@ public class SchemaHolderImpl implements SchemaHolder {
 
     private void rebuild() {
         SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
+
         newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+
         igniteSchemas.forEach(newCalciteSchema::add);
+        externalCatalogs.forEach(newCalciteSchema::add);
+
         calciteSchema = newCalciteSchema;
 
         onSchemaUpdatedCallback.run();
@@ -136,4 +169,16 @@ public class SchemaHolderImpl implements SchemaHolder {
     private static String removeSchema(String schemaName, String canonicalName) {
         return canonicalName.substring(schemaName.length() + 1);
     }
+
+    private static class ExternalSchemaHolder extends AbstractSchema {
+        private final Map<String, Table> tables;
+
+        public ExternalSchemaHolder(Map<String, Table> tables) {
+            this.tables = tables;
+        }
+
+        @Override protected Map<String, Table> getTableMap() {
+            return tables;
+        }
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index f16a989..c53da16 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -36,6 +36,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
@@ -63,6 +65,7 @@ import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteGateway;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
@@ -70,13 +73,11 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchang
 import org.jetbrains.annotations.Nullable;
 
 /**
- * TraitUtils.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * TraitUtils. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class TraitUtils {
     /**
-     * Enforce.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Enforce. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     @Nullable
     public static RelNode enforce(RelNode rel, RelTraitSet toTraits) {
@@ -119,6 +120,14 @@ public class TraitUtils {
 
         RelTraitDef converter = fromTrait.getTraitDef();
 
+        if (converter == ConventionTraitDef.INSTANCE) {
+            return convertConvention(planner, (Convention) toTrait, rel);
+        }
+
+        if (rel.getConvention() != IgniteConvention.INSTANCE) {
+            return null;
+        }
+
         if (converter == RelCollationTraitDef.INSTANCE) {
             return convertCollation(planner, (RelCollation) toTrait, rel);
         } else if (converter == DistributionTraitDef.INSTANCE) {
@@ -130,9 +139,20 @@ public class TraitUtils {
         }
     }
 
+    private static RelNode convertConvention(RelOptPlanner planner, Convention toTrait, RelNode rel) {
+        Convention fromTrait = rel.getConvention();
+
+        if (fromTrait.satisfies(toTrait)) {
+            return rel;
+        }
+
+        RelTraitSet traits = rel.getTraitSet().replace(toTrait);
+
+        return new IgniteGateway(fromTrait.getName(), rel.getCluster(), traits, rel);
+    }
+
     /**
-     * Convert collation.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Convert collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     @Nullable
     public static RelNode convertCollation(RelOptPlanner planner,
@@ -149,8 +169,7 @@ public class TraitUtils {
     }
 
     /**
-     * Convert distribution.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Convert distribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     @Nullable
     public static RelNode convertDistribution(RelOptPlanner planner,
@@ -186,8 +205,7 @@ public class TraitUtils {
     }
 
     /**
-     * Convert rewindability.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Convert rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     @Nullable
     public static RelNode convertRewindability(RelOptPlanner planner,
@@ -231,8 +249,7 @@ public class TraitUtils {
     }
 
     /**
-     * Distribution.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Distribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static IgniteDistribution distribution(RelNode rel) {
         return rel instanceof IgniteRel
@@ -241,16 +258,14 @@ public class TraitUtils {
     }
 
     /**
-     * Distribution.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Distribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static IgniteDistribution distribution(RelTraitSet traits) {
         return traits.getTrait(DistributionTraitDef.INSTANCE);
     }
 
     /**
-     * Collation.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static RelCollation collation(RelNode rel) {
         return rel instanceof IgniteRel
@@ -259,16 +274,14 @@ public class TraitUtils {
     }
 
     /**
-     * Collation.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static RelCollation collation(RelTraitSet traits) {
         return traits.getTrait(RelCollationTraitDef.INSTANCE);
     }
 
     /**
-     * Rewindability.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static RewindabilityTrait rewindability(RelNode rel) {
         return rel instanceof IgniteRel
@@ -277,16 +290,14 @@ public class TraitUtils {
     }
 
     /**
-     * Rewindability.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static RewindabilityTrait rewindability(RelTraitSet traits) {
         return traits.getTrait(RewindabilityTraitDef.INSTANCE);
     }
 
     /**
-     * Correlation.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Correlation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static CorrelationTrait correlation(RelNode rel) {
         return rel instanceof IgniteRel
@@ -295,16 +306,14 @@ public class TraitUtils {
     }
 
     /**
-     * Correlation.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Correlation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static CorrelationTrait correlation(RelTraitSet traits) {
         return traits.getTrait(CorrelationTraitDef.INSTANCE);
     }
 
     /**
-     * ChangeTraits.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * ChangeTraits. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static RelInput changeTraits(RelInput input, RelTrait... traits) {
         RelTraitSet traitSet = input.getTraitSet();
@@ -435,8 +444,7 @@ public class TraitUtils {
     }
 
     /**
-     * ProjectCollation.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * ProjectCollation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static RelCollation projectCollation(RelCollation collation, List<RexNode> projects, RelDataType inputRowType) {
         if (collation.getFieldCollations().isEmpty()) {
@@ -449,8 +457,7 @@ public class TraitUtils {
     }
 
     /**
-     * ProjectDistribution.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * ProjectDistribution. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static IgniteDistribution projectDistribution(IgniteDistribution distribution, List<RexNode> projects,
             RelDataType inputRowType) {
@@ -464,16 +471,23 @@ public class TraitUtils {
     }
 
     /**
-     * PassThrough.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * PassThrough. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static Pair<RelTraitSet, List<RelTraitSet>> passThrough(TraitsAwareIgniteRel rel, RelTraitSet requiredTraits) {
-        if (requiredTraits.getConvention() != IgniteConvention.INSTANCE || rel.getInputs().isEmpty()) {
+        return passThrough(IgniteConvention.INSTANCE, rel, requiredTraits);
+    }
+
+    /**
+     * PassThrough. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
+    public static Pair<RelTraitSet, List<RelTraitSet>> passThrough(Convention convention, TraitsAwareIgniteRel rel,
+            RelTraitSet requiredTraits) {
+        if (requiredTraits.getConvention() != convention || rel.getInputs().isEmpty()) {
             return null;
         }
 
         List<RelTraitSet> inTraits = Collections.nCopies(rel.getInputs().size(),
-                rel.getCluster().traitSetOf(IgniteConvention.INSTANCE));
+                rel.getCluster().traitSetOf(convention));
 
         List<Pair<RelTraitSet, List<RelTraitSet>>> traits = new PropagationContext(Set.of(Pair.of(requiredTraits, inTraits)))
                 .propagate((in, outs) -> singletonListFromNullable(rel.passThroughCollation(in, outs)))
@@ -488,19 +502,29 @@ public class TraitUtils {
     }
 
     /**
-     * Derive.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Derive. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public static List<RelNode> derive(TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
+        return derive(IgniteConvention.INSTANCE, rel, inTraits);
+    }
+
+    /**
+     * Derive. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
+    public static List<RelNode> derive(Convention convention, TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
         assert !nullOrEmpty(inTraits);
 
-        RelTraitSet outTraits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE);
+        RelTraitSet outTraits = rel.getCluster().traitSetOf(convention);
         Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = combinations(outTraits, inTraits);
 
         if (combinations.isEmpty()) {
             return List.of();
         }
 
+        if (inTraits.stream().flatMap(List::stream).anyMatch(traitSet -> traitSet.getConvention() != convention)) {
+            return List.of();
+        }
+
         return new PropagationContext(combinations)
                 .propagate(rel::deriveCollation)
                 .propagate(rel::deriveDistribution)
@@ -510,8 +534,7 @@ public class TraitUtils {
     }
 
     /**
-     * SingletonListFromNullable.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * SingletonListFromNullable. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      *
      * @param elem Elem.
      */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
index 4605162..2f87e4d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/BaseQueryContext.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.processors.query.calcite.util;
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
 import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
@@ -37,6 +41,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.processors.query.calcite.QueryCancel;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
 import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
@@ -44,6 +49,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.logger.NullLogger;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Base query context.
@@ -99,6 +105,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
     private final RexBuilder rexBuilder;
     
     private final QueryCancel qryCancel;
+
+    private final Map<String, SqlExtension> extensions;
     
     private CalciteCatalogReader catalogReader;
     
@@ -108,7 +116,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
     private BaseQueryContext(
             FrameworkConfig cfg,
             Context parentCtx,
-            IgniteLogger log
+            IgniteLogger log,
+            Map<String, SqlExtension> extensions
     ) {
         super(Contexts.chain(parentCtx, cfg.getContext()));
         
@@ -116,6 +125,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
         this.cfg = Frameworks.newConfigBuilder(cfg).context(this).build();
         
         this.log = log;
+        this.extensions = extensions;
         
         RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem());
         
@@ -133,7 +143,15 @@ public final class BaseQueryContext extends AbstractQueryContext {
     public static BaseQueryContext empty() {
         return EMPTY_CONTEXT;
     }
-    
+
+    public List<SqlExtension> extensions() {
+        return new ArrayList<>(extensions.values());
+    }
+
+    public @Nullable SqlExtension extension(String name) {
+        return extensions.get(name);
+    }
+
     public FrameworkConfig config() {
         return cfg;
     }
@@ -198,6 +216,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
         private Context parentCtx = Contexts.empty();
         
         private IgniteLogger log = new NullLogger();
+
+        private Map<String, SqlExtension> extensions = Collections.emptyMap();
         
         public Builder frameworkConfig(@NotNull FrameworkConfig frameworkCfg) {
             this.frameworkCfg = frameworkCfg;
@@ -213,9 +233,14 @@ public final class BaseQueryContext extends AbstractQueryContext {
             this.log = log;
             return this;
         }
+
+        public Builder extensions(Map<String, SqlExtension> extensions) {
+            this.extensions = extensions;
+            return this;
+        }
         
         public BaseQueryContext build() {
-            return new BaseQueryContext(frameworkCfg, parentCtx, log);
+            return new BaseQueryContext(frameworkCfg, parentCtx, log, extensions);
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 2a9110d..01be38e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorI
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractMultiStepPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
@@ -825,8 +824,4 @@ public final class Commons {
     public static RelOptCluster cluster() {
         return CLUSTER;
     }
-
-    public static MappingQueryContext mapContext(String locNodeId, long topVer) {
-        return new MappingQueryContext(locNodeId, topVer);
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java
index f3ea81f..0b8e52d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.util;
 
+import static org.apache.calcite.util.Util.last;
 import static org.apache.ignite.internal.processors.query.calcite.util.Commons.nativeTypeToClass;
 import static org.apache.ignite.internal.processors.query.calcite.util.Commons.transform;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -227,11 +228,11 @@ public class TypeUtils {
             return typeFactory.getResultClass(type);
         }
 
-        RelOptTable table = schema.getTableForMember(origin.subList(0, 2));
+        RelOptTable table = schema.getTableForMember(origin.subList(0, origin.size() - 1));
 
         assert table != null;
 
-        ColumnDescriptor fldDesc = table.unwrap(TableDescriptor.class).columnDescriptor(origin.get(2));
+        ColumnDescriptor fldDesc = table.unwrap(TableDescriptor.class).columnDescriptor(last(origin));
 
         assert fldDesc != null;
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index a761c42..9363c4e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.processors.query.calcite.util.BaseQueryContext;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -173,7 +172,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+        plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
 
         assertNotNull(plan);
 
@@ -251,7 +250,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+        plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
 
         assertNotNull(plan);
 
@@ -334,7 +333,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+        plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
 
         assertEquals(3, plan.fragments().size());
     }
@@ -415,7 +414,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+        plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
 
         assertNotNull(plan);
 
@@ -497,7 +496,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+        plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
 
         assertEquals(3, plan.fragments().size());
     }
@@ -574,7 +573,7 @@ public class PlannerTest extends AbstractPlannerTest {
 
         assertNotNull(plan);
 
-        plan.init(this::intermediateMapping, Commons.mapContext(CollectionUtils.first(NODES), 0L));
+        plan.init(this::intermediateMapping, mapContext(CollectionUtils.first(NODES), 0L));
 
         assertNotNull(plan);
 
@@ -835,4 +834,8 @@ public class PlannerTest extends AbstractPlannerTest {
             @Nullable Predicate<ClusterNode> filter) {
         return single ? select(NODES, 0) : select(NODES, 0, 1, 2, 3);
     }
+
+    private static MappingQueryContext mapContext(String locNodeId, long topVer) {
+        return new MappingQueryContext(null, locNodeId, topVer);
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/ItSqlExtensionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/ItSqlExtensionTest.java
new file mode 100644
index 0000000..9403464
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/ItSqlExtensionTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite;
+
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.calcite.extension.TestExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test cases for SQL Extension API.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
+public class ItSqlExtensionTest extends AbstractBasicIntegrationTest {
+    /**
+     * Before all.
+     */
+    @BeforeAll
+    static void initTestData() {
+        createAndPopulateTable();
+    }
+
+    @Test
+    public void test() {
+        TestExtension.allNodes = CLUSTER_NODES.stream()
+                .map(i -> (IgniteImpl) i)
+                .map(IgniteImpl::id)
+                .collect(Collectors.toList());
+
+        assertQuery(""
+                + "select t.node_id,"
+                + "       t.num,"
+                + "       p.name"
+                + "  from person p"
+                + "  join TEST_EXT.CUSTOM_SCHEMA.TEST_TBL t "
+                + "    on p.id = t.num"
+                + " where t.num <> 1")
+                .columnNames("NODE_ID", "NUM", "NAME")
+                .columnTypes(String.class, Integer.class, String.class)
+                .returns(nodeId(0), 2, "Ilya")
+                .returns(nodeId(1), 2, "Ilya")
+                .returns(nodeId(2), 2, "Ilya")
+                .returns(nodeId(0), 3, "Roma")
+                .returns(nodeId(1), 3, "Roma")
+                .returns(nodeId(2), 3, "Roma")
+                .check();
+    }
+
+    private String nodeId(int idx) {
+        return ((IgniteImpl) CLUSTER_NODES.get(idx)).id();
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestExtension.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestExtension.java
new file mode 100644
index 0000000..22af344
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestExtension.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import org.apache.ignite.internal.processors.query.calcite.extension.CatalogUpdateListener;
+import org.apache.ignite.internal.processors.query.calcite.extension.ExternalConvention;
+import org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test extension implementation.
+ */
+public class TestExtension implements SqlExtension {
+    private static final String EXTENSION_NAME = "TEST_EXT";
+
+    private static final String TEST_TABLE_NAME = "TEST_TBL";
+
+    private static final String TEST_SCHEMA_NAME = "CUSTOM_SCHEMA";
+
+    static final Convention CONVENTION = new ExternalConvention(EXTENSION_NAME, IgniteRel.class);
+
+    public static List<String> allNodes;
+
+    /** {@inheritDoc} */
+    @Override
+    public String name() {
+        return EXTENSION_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void init(CatalogUpdateListener catalogUpdateListener) {
+        catalogUpdateListener.onCatalogUpdated(new ExternalCatalogImpl());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+        if (phase != PlannerPhase.OPTIMIZATION) {
+            return Set.of();
+        }
+
+        return Set.of(TestFilterConverterRule.INSTANCE);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> RelImplementor<RowT> implementor() {
+        return new RelImplementor<>() {
+            @Override
+            public Node<RowT> implement(ExecutionContext<RowT> ctx, IgniteRel node) {
+                if (node instanceof TestPhysFilter) {
+                    return implement(ctx, (TestPhysFilter) node);
+                }
+
+                if (node instanceof TestPhysTableScan) {
+                    return implement(ctx, (TestPhysTableScan) node);
+                }
+
+                throw new AssertionError("Unexpected node " + (node != null ? "of class " + node.getClass().getName() : "null"));
+            }
+
+            private Node<RowT> implement(ExecutionContext<RowT> ctx, TestPhysTableScan scan) {
+                RowHandler.RowFactory<RowT> factory = ctx.rowHandler().factory(ctx.getTypeFactory(), scan.getRowType());
+
+                return new ScanNode<>(
+                        ctx, scan.getRowType(), List.of(
+                        factory.create(ctx.localNodeId(), 1, UUID.randomUUID().toString()),
+                        factory.create(ctx.localNodeId(), 2, UUID.randomUUID().toString()),
+                        factory.create(ctx.localNodeId(), 3, UUID.randomUUID().toString())
+                )
+                );
+            }
+
+            private Node<RowT> implement(ExecutionContext<RowT> ctx, TestPhysFilter filter) {
+                Predicate<RowT> pred = ctx.expressionFactory().predicate(filter.getCondition(), filter.getRowType());
+
+                FilterNode<RowT> node = new FilterNode<>(ctx, filter.getRowType(), pred);
+
+                Node<RowT> input = implement(ctx, (IgniteRel) filter.getInput());
+
+                node.register(input);
+
+                return node;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ColocationGroup colocationGroup(IgniteRel node) {
+        return ColocationGroup.forNodes(allNodes);
+    }
+
+    private static class ExternalCatalogImpl implements ExternalCatalog {
+        private final Map<String, ExternalSchema> schemas = Map.of(TEST_SCHEMA_NAME, new ExternalSchemaImpl());
+
+        /** {@inheritDoc} */
+        @Override
+        public List<String> schemaNames() {
+            return List.of(TEST_SCHEMA_NAME);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public @Nullable ExternalSchema schema(String name) {
+            return schemas.get(name);
+        }
+    }
+
+    private static class ExternalSchemaImpl implements ExternalSchema {
+        private final Map<String, IgniteTable> tables = Map.of(TEST_TABLE_NAME, new TestTableImpl(
+                new TableDescriptorImpl(
+                        List.of(
+                                new ColumnDescriptorImpl("NODE_ID", true, 0, NativeTypes.stringOf(256), null),
+                                new ColumnDescriptorImpl("NUM", true, 1, NativeTypes.INT32, null),
+                                new ColumnDescriptorImpl("VAL", false, 2, NativeTypes.stringOf(256), null)
+                        )
+                )
+        ));
+
+        /** {@inheritDoc} */
+        @Override
+        public List<String> tableNames() {
+            return List.of(TEST_TABLE_NAME);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public @Nullable IgniteTable table(String name) {
+            return tables.get(name);
+        }
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestFilterConverterRule.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestFilterConverterRule.java
new file mode 100644
index 0000000..d31b5c4
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestFilterConverterRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A converter from logical filter to the test physical filter relation.
+ */
+public class TestFilterConverterRule extends ConverterRule {
+    /** An instance of this rule. */
+    public static final RelOptRule INSTANCE = new TestFilterConverterRule();
+
+    private TestFilterConverterRule() {
+        super(Config.INSTANCE
+                .withConversion(LogicalFilter.class, Convention.NONE, TestExtension.CONVENTION, "TestFilterConverter"));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable RelNode convert(RelNode rel) {
+        RelOptCluster cluster = rel.getCluster();
+        RelTraitSet traits = rel.getTraitSet()
+                .replace(TestExtension.CONVENTION)
+                .replace(IgniteDistributions.single());
+
+        LogicalFilter filter = (LogicalFilter) rel;
+
+        RelTraitSet inputTraits = filter.getInput().getTraitSet()
+                .replace(TestExtension.CONVENTION)
+                .replace(IgniteDistributions.single());
+
+        return new TestPhysFilter(cluster, traits, convert(filter.getInput(), inputTraits), filter.getCondition());
+    }
+
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysFilter.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysFilter.java
new file mode 100644
index 0000000..1452052
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysFilter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
+
+/**
+ * A test filter node.
+ */
+public class TestPhysFilter extends Filter implements TraitsAwareIgniteRel {
+    /**
+     * Constructor.
+     *
+     * @param cluster   Cluster this node belongs to.
+     * @param traits    A set of the traits this node satisfy.
+     * @param input     An input relation.
+     * @param condition The predicate to filter put input rows.
+     */
+    public TestPhysFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) {
+        super(cluster, traits, input, condition);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param input Context to recover this relation from.
+     */
+    public TestPhysFilter(RelInput input) {
+        super(changeTraits(input, TestExtension.CONVENTION));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+        return new TestPhysFilter(getCluster(), traitSet, input, condition);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return planner.getCostFactory().makeZeroCost();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new TestPhysFilter(cluster, getTraitSet(), sole(inputs), getCondition());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits,
+            List<RelTraitSet> inTraits) {
+        if (!TraitUtils.rewindability(inTraits.get(0)).rewindable() && RexUtils.hasCorrelation(getCondition())) {
+            return List.of();
+        }
+
+        return List.of(Pair.of(nodeTraits.replace(TraitUtils.rewindability(inTraits.get(0))),
+                inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(RelTraitSet nodeTraits,
+            List<RelTraitSet> inTraits) {
+        return List.of(Pair.of(nodeTraits.replace(TraitUtils.distribution(inTraits.get(0))),
+                inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+            List<RelTraitSet> inTraits) {
+        return List.of(Pair.of(nodeTraits.replace(TraitUtils.collation(inTraits.get(0))),
+                inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCorrelation(RelTraitSet nodeTraits,
+            List<RelTraitSet> inTraits) {
+        Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(getCondition());
+
+        corrIds.addAll(TraitUtils.correlation(inTraits.get(0)).correlationIds());
+
+        return List.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)), inTraits));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<RelNode> derive(List<List<RelTraitSet>> inputTraits) {
+        return TraitUtils.derive(TestExtension.CONVENTION, this, inputTraits);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) {
+        return TraitUtils.passThrough(TestExtension.CONVENTION, this, required);
+    }
+
+
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysTableScan.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysTableScan.java
new file mode 100644
index 0000000..0fc0a3b
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestPhysTableScan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A scan over test table.
+ */
+public class TestPhysTableScan extends TableScan implements IgniteRel {
+    /**
+     * Constructor.
+     *
+     * @param cluster  Cluster this node belongs to.
+     * @param traitSet A set of the traits this node satisfy.
+     * @param hints    A list of hints applicable to the current node.
+     * @param table    The actual table to be scanned.
+     */
+    protected TestPhysTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+            List<RelHint> hints, RelOptTable table) {
+        super(cluster, traitSet, hints, table);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param input Context to recover this relation from.
+     */
+    public TestPhysTableScan(RelInput input) {
+        super(changeTraits(input, TestExtension.CONVENTION));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return (T) this;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new TestPhysTableScan(cluster, getTraitSet(), getHints(), getTable());
+    }
+
+    @Override
+    public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return super.computeSelfCost(planner, mq);
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestTableImpl.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestTableImpl.java
new file mode 100644
index 0000000..37191d8
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/calcite/extension/TestTableImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.calcite.extension;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A test table implementation.
+ */
+class TestTableImpl extends AbstractTable implements IgniteTable {
+    private final TableDescriptor desc;
+
+    /**
+     * Constructor.
+     *
+     * @param desc A descriptor of the table.
+     */
+    public TestTableImpl(TableDescriptor desc) {
+        this.desc = desc;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableDescriptor descriptor() {
+        return desc;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet requiredColumns) {
+        return desc.rowType((IgniteTypeFactory) typeFactory, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Statistic getStatistic() {
+        return new Statistic() {
+            @Override
+            public @Nullable List<RelCollation> getCollations() {
+                return List.of();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
+        RelTraitSet traits = cluster.traitSetOf(TestExtension.CONVENTION)
+                .replace(desc.distribution());
+
+        return new TestPhysTableScan(
+                cluster, traits, List.of(), relOptTbl
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteDistribution distribution() {
+        return desc.distribution();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <C> C unwrap(Class<C> cls) {
+        if (cls.isInstance(desc)) {
+            return cls.cast(desc);
+        }
+
+        return super.unwrap(cls);
+    }
+}
diff --git a/modules/runner/src/integrationTest/resources/META-INF/services/org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension b/modules/runner/src/integrationTest/resources/META-INF/services/org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension
new file mode 100644
index 0000000..4de4690
--- /dev/null
+++ b/modules/runner/src/integrationTest/resources/META-INF/services/org.apache.ignite.internal.processors.query.calcite.extension.SqlExtension
@@ -0,0 +1 @@
+org.apache.ignite.internal.calcite.extension.TestExtension
\ No newline at end of file
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0bd11d3..c529ff2 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -385,6 +385,13 @@ public class IgniteImpl implements Ignite {
     }
 
     /**
+     * Returns the id of the current node.
+     */
+    public String id() {
+        return clusterSvc.topologyService().localMember().id();
+    }
+
+    /**
      * Checks node status. If it's {@link Status#STOPPING} then prevents further starting and throws NodeStoppingException that will lead to
      * stopping already started components later on, otherwise starts component and add it to started components list.
      *