You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2023/01/11 06:58:09 UTC

[ignite-3] branch main updated: IGNITE-17275 Sql. Performance testing of pure SQL execution (#1483)

This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cb7cade340 IGNITE-17275 Sql. Performance testing of pure SQL execution (#1483)
cb7cade340 is described below

commit cb7cade340edd971ba7c8924fc8e76a0240d9db4
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Wed Jan 11 08:58:03 2023 +0200

    IGNITE-17275 Sql. Performance testing of pure SQL execution (#1483)
---
 .../internal/testframework/IgniteTestUtils.java    |   2 +-
 modules/sql-engine/build.gradle                    |   2 +
 modules/sql-engine/pom.xml                         |  25 ++
 .../internal/sql/engine/SqlQueryProcessor.java     |  10 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  28 +-
 .../sql/engine/schema/ColumnDescriptorImpl.java    |   3 +-
 .../internal/sql/engine/schema/IgniteSchema.java   |  21 +-
 .../sql/engine/benchmarks/SqlBenchmark.java        | 150 +++++++++
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  40 +--
 .../sql/engine/framework/DataProvider.java         |  43 +++
 .../engine/framework/PredefinedSchemaManager.java  |  80 +++++
 .../sql/engine/framework/TestBuilders.java         | 351 +++++++++++++++++++++
 .../internal/sql/engine/framework/TestCluster.java |  66 ++++
 .../sql/engine/framework/TestClusterService.java   | 198 ++++++++++++
 .../internal/sql/engine/framework/TestNode.java    | 199 ++++++++++++
 .../internal/sql/engine/framework/TestTable.java   | 300 ++++++++++++++++++
 .../sql/engine/planner/AbstractPlannerTest.java    |  33 +-
 17 files changed, 1473 insertions(+), 78 deletions(-)

diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index a7a700a0ed..aae2063c3d 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -58,7 +58,7 @@ import org.junit.jupiter.api.TestInfo;
 public final class IgniteTestUtils {
     private static final IgniteLogger LOG = Loggers.forClass(IgniteTestUtils.class);
 
-    private static final int TIMEOUT_SEC = 5000;
+    private static final int TIMEOUT_SEC = 30;
 
     /**
      * Set object field value via reflection.
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 30999cac1e..117e58d885 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -54,6 +54,7 @@ dependencies {
     annotationProcessor libs.value.annotation.processor
 
     testAnnotationProcessor project(':ignite-network-annotation-processor')
+    testAnnotationProcessor libs.jmh.annotation.processor
     testAnnotationProcessor libs.value.annotation.processor
     testImplementation project(':ignite-core')
     testImplementation project(':ignite-baseline')
@@ -65,6 +66,7 @@ dependencies {
     testImplementation project(':ignite-storage-rocksdb')
     testImplementation project(':ignite-cluster-management')
     testImplementation project(':ignite-vault')
+    testImplementation libs.jmh.core
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-configuration')))
     testImplementation(testFixtures(project(':ignite-storage-api')))
diff --git a/modules/sql-engine/pom.xml b/modules/sql-engine/pom.xml
index 6ad7ca90dc..4066df87cf 100644
--- a/modules/sql-engine/pom.xml
+++ b/modules/sql-engine/pom.xml
@@ -208,6 +208,19 @@
             <artifactId>slf4j-jdk14</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <!-- Benchmark dependencies -->
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -313,6 +326,12 @@
                         <artifactId>value</artifactId>
                         <version>${immutables.version}</version>
                     </dependency>
+
+                    <dependency>
+                        <groupId>org.openjdk.jmh</groupId>
+                        <artifactId>jmh-generator-annprocess</artifactId>
+                        <version>${jmh.framework.version}</version>
+                    </dependency>
                 </dependencies>
                 <configuration>
 <!--                    <compilerArgs>
@@ -330,6 +349,12 @@
                             <artifactId>value</artifactId>
                             <version>${immutables.version}</version>
                         </path>
+
+                        <path>
+                            <groupId>org.openjdk.jmh</groupId>
+                            <artifactId>jmh-generator-annprocess</artifactId>
+                            <version>${jmh.framework.version}</version>
+                        </path>
                     </annotationProcessorPaths>
                 </configuration>
             </plugin>
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0ce7773d9e..eecaf29519 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -210,18 +211,17 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         this.prepareSvc = prepareSvc;
 
+        var ddlCommandHandler = new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager);
+
         var executionSrvc = registerService(ExecutionServiceImpl.create(
                 clusterSrvc.topologyService(),
                 msgSrvc,
                 sqlSchemaManager,
-                distributionZoneManager,
-                tableManager,
-                indexManager,
+                ddlCommandHandler,
                 taskExecutor,
                 ArrayRowHandler.INSTANCE,
                 mailboxRegistry,
-                exchangeService,
-                dataStorageManager
+                exchangeService
         ));
 
         clusterSrvc.topologyService().addEventHandler(executionSrvc);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 37a243efce..9b4d7027eb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -39,9 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
@@ -72,8 +70,6 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -117,13 +113,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
      * @param topSrvc Topology service.
      * @param msgSrvc Message service.
      * @param sqlSchemaManager Schema manager.
-     * @param indexManager Index manager.
-     * @param tblManager Table manager.
+     * @param ddlCommandHandler Handler of the DDL commands.
      * @param taskExecutor Task executor.
      * @param handler Row handler.
      * @param mailboxRegistry Mailbox registry.
      * @param exchangeSrvc Exchange service.
-     * @param dataStorageManager Storage manager.
      * @param <RowT> Type of the sql row.
      * @return An execution service.
      */
@@ -131,21 +125,18 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
             TopologyService topSrvc,
             MessageService msgSrvc,
             SqlSchemaManager sqlSchemaManager,
-            DistributionZoneManager distributionZoneManager,
-            TableManager tblManager,
-            IndexManager indexManager,
+            DdlCommandHandler ddlCommandHandler,
             QueryTaskExecutor taskExecutor,
             RowHandler<RowT> handler,
             MailboxRegistry mailboxRegistry,
-            ExchangeService exchangeSrvc,
-            DataStorageManager dataStorageManager
+            ExchangeService exchangeSrvc
     ) {
         return new ExecutionServiceImpl<>(
                 topSrvc.localMember(),
                 msgSrvc,
                 new MappingServiceImpl(topSrvc),
                 sqlSchemaManager,
-                new DdlCommandHandler(distributionZoneManager, tblManager, indexManager, dataStorageManager),
+                ddlCommandHandler,
                 taskExecutor,
                 handler,
                 exchangeSrvc,
@@ -161,7 +152,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
     /**
      * Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    ExecutionServiceImpl(
+    public ExecutionServiceImpl(
             ClusterNode localNode,
             MessageService msgSrvc,
             MappingService mappingSrvc,
@@ -718,8 +709,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         }
     }
 
+    /**
+     * A factory of the relational node implementors.
+     *
+     * @param <RowT> A type of the row the execution tree will be working with.
+     * @see LogicalRelImplementor
+     */
     @FunctionalInterface
-    interface ImplementorFactory<RowT> {
+    public interface ImplementorFactory<RowT> {
+        /** Creates the relational node implementor with the given context. */
         LogicalRelImplementor<RowT> create(ExecutionContext<RowT> ctx);
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
index 76f56d23a3..2dcc455ecb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.schema;
 import java.util.Objects;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.NativeType;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Simple implementation of {@link ColumnDescriptor}.
@@ -65,7 +66,7 @@ public class ColumnDescriptorImpl implements ColumnDescriptor {
             int physicalIndex,
             NativeType type,
             DefaultValueStrategy defaultStrategy,
-            Supplier<Object> dfltVal
+            @Nullable Supplier<Object> dfltVal
     ) {
         this.key = key;
         this.nullable = nullable;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 52aca0781d..f989c9bfeb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite schema.
@@ -35,20 +36,26 @@ public class IgniteSchema extends AbstractSchema {
     private final Map<UUID, IgniteIndex> idxMap;
 
     /**
-     * Creates a Schema.
+     * Creates a Schema with given tables and indexes.
      *
-     * @param schemaName Schema name.
+     * @param schemaName A name of the schema to create.
+     * @param tableMap A collection of a tables belonging to the schema.
+     * @param indexMap A collection of an indexes belonging to the schema.
      */
-    public IgniteSchema(String schemaName, Map<String, Table> tblMap, Map<UUID, IgniteIndex> idxMap) {
+    public IgniteSchema(
+            String schemaName,
+            @Nullable Map<String, Table> tableMap,
+            @Nullable Map<UUID, IgniteIndex> indexMap
+    ) {
         this.schemaName = schemaName;
-        this.tblMap = tblMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(tblMap);
-        this.idxMap = idxMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(idxMap);
+        this.tblMap = tableMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(tableMap);
+        this.idxMap = indexMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(indexMap);
     }
 
     /**
-     * Creates a Schema.
+     * Creates an empty Schema.
      *
-     * @param schemaName Schema name.
+     * @param schemaName A name of the schema to create.
      */
     public IgniteSchema(String schemaName) {
         this(schemaName, null, null);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
new file mode 100644
index 0000000000..7a5d239c77
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.benchmarks;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * A micro-benchmark of sql execution.
+ */
+@Warmup(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+public class SqlBenchmark {
+    private final DataProvider<Object[]> dataProvider = new SameRowDataProvider(
+            new Object[] {42, UUID.randomUUID().toString()}, 333
+    );
+
+    // @formatter:off
+    private final TestCluster cluster = TestBuilders.cluster()
+            .nodes("N1", "N2", "N3")
+            .addTable()
+                    .name("T1")
+                    .distribution(IgniteDistributions.hash(List.of(0)))
+                    .addColumn("ID", NativeTypes.INT32)
+                    .addColumn("VAL", NativeTypes.stringOf(64))
+                    .defaultDataProvider(dataProvider)
+                    .end()
+            .build();
+    // @formatter:on
+
+    private final TestNode gatewayNode = cluster.node("N1");
+
+    private QueryPlan plan;
+
+    /** Starts the cluster and prepares the plan of the query. */
+    @Setup
+    public void setUp() {
+        cluster.start();
+
+        plan = gatewayNode.prepare("SELECT * FROM t1");
+    }
+
+    /** Stops the cluster. */
+    @TearDown
+    public void tearDown() throws Exception {
+        cluster.stop();
+    }
+
+    /** Very simple test to measure performance of minimal possible distributed query. */
+    @Benchmark
+    public void selectAllSimple(Blackhole bh) {
+        for (var row : await(gatewayNode.executePlan(plan).requestNextAsync(1_000)).items()) {
+            bh.consume(row);
+        }
+    }
+
+    /**
+     * Runs the benchmark.
+     *
+     * @param args args
+     * @throws Exception if something goes wrong
+     */
+    public static void main(String[] args) throws Exception {
+        Options build = new OptionsBuilder()
+                //.addProfiler("gc")
+                .include(SqlBenchmark.class.getName())
+                .build();
+
+        new Runner(build).run();
+    }
+
+    private static class SameRowDataProvider implements DataProvider<Object[]> {
+        private final int times;
+        private final Object[] row;
+
+        SameRowDataProvider(Object[] row, int times) {
+            this.times = times;
+            this.row = row;
+        }
+
+        @Override
+        public Iterator<Object[]> iterator() {
+            return new Iterator<>() {
+                private int counter;
+
+                @Override
+                public boolean hasNext() {
+                    return counter < times;
+                }
+
+                @Override
+                public Object[] next() {
+                    if (!hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    counter++;
+
+                    return row;
+                }
+            };
+        }
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 92e963db86..49771678a9 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER;
 import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -35,6 +34,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -45,18 +45,20 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.rel.Node;
 import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.framework.TestTable;
 import org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
 import org.apache.ignite.internal.sql.engine.message.MessageListener;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
@@ -64,14 +66,16 @@ import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
-import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable;
-import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -108,7 +112,7 @@ public class ExecutionServiceImplTest {
     );
 
     private final TestTable table = createTable("TEST_TBL", 1_000_000, IgniteDistributions.random(),
-            "ID", Integer.class, "VAL", Integer.class);
+            "ID", NativeTypes.INT32, "VAL", NativeTypes.INT32);
 
     private final IgniteSchema schema = new IgniteSchema("PUBLIC", Map.of(table.name(), table), null);
 
@@ -588,22 +592,22 @@ public class ExecutionServiceImplTest {
             throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
         }
 
-        RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(CLUSTER.getTypeFactory());
+        List<ColumnDescriptor> columns = new ArrayList<>();
 
         for (int i = 0; i < fields.length; i += 2) {
-            b.add((String) fields[i], CLUSTER.getTypeFactory().createJavaType((Class<?>) fields[i + 1]));
+            columns.add(
+                    new ColumnDescriptorImpl(
+                            (String) fields[i], false, true, i, i,
+                            (NativeType) fields[i + 1], DefaultValueStrategy.DEFAULT_NULL, null
+                    )
+            );
         }
 
-        return new TestTable(name, b.build(), size) {
-            @Override
-            public IgniteDistribution distribution() {
-                return distr;
-            }
-
-            @Override
-            public ColocationGroup colocationGroup(MappingQueryContext ctx) {
-                return ColocationGroup.forNodes(nodeNames);
-            }
-        };
+        return new TestTable(
+                new TableDescriptorImpl(columns, distr),
+                name,
+                ColocationGroup.forNodes(nodeNames),
+                size
+        );
     }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
new file mode 100644
index 0000000000..a143f4c6ae
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+
+/**
+ * Producer of the rows to use with {@link TestTable} in execution-related scenarios.
+ *
+ * <p>A data provider is supposed to be created for table on per-node basis. It's up
+ * to developer to keep data provider in sync with the schema of the table this data provider relates to.
+ *
+ * @param <T> A type of the produced elements.
+ * @see TestTable
+ */
+@FunctionalInterface
+public interface DataProvider<T> extends Iterable<T> {
+    /**
+     * Creates data provider from given collection.
+     *
+     * @param collection Collection to use as source of data.
+     * @param <T> A type of the produced elements.
+     * @return A data provider instance backed by given collection.
+     */
+    static <T> DataProvider<T> fromCollection(Collection<T> collection) {
+        return collection::iterator;
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
new file mode 100644
index 0000000000..bd9e9bc4ce
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy wrapper for predefined collection of schemas.
+ *
+ * <p>Accepts collection of {@link IgniteSchema schemas} as parameter and implements required
+ * methods of {@link SqlSchemaManager} around them. Assumes given schemas will never be changed.
+ *
+ * @see IgniteSchema
+ * @see SqlSchemaManager
+ */
+public class PredefinedSchemaManager implements SqlSchemaManager {
+    private final SchemaPlus root;
+    private final Map<UUID, IgniteTable> tableById;
+
+    /** Constructs schema manager from a single schema. */
+    public PredefinedSchemaManager(IgniteSchema schema) {
+        this(List.of(schema));
+    }
+
+    /** Constructs schema manager from a collection of schemas. */
+    public PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
+        this.root = Frameworks.createRootSchema(false);
+        this.tableById = new HashMap<>();
+
+        for (IgniteSchema schema : schemas) {
+            root.add(schema.getName(), schema);
+
+            tableById.putAll(
+                    schema.getTableNames().stream()
+                            .map(schema::getTable)
+                            .map(IgniteTable.class::cast)
+                            .collect(Collectors.toMap(IgniteTable::id, Function.identity()))
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SchemaPlus schema(@Nullable String schema) {
+        return schema == null ? root : root.getSubSchema(schema);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteTable tableById(UUID id, int ver) {
+        return tableById.get(id);
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
new file mode 100644
index 0000000000..6a6ae1bb52
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.Table;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+
+/**
+ * A collection of builders to create test objects.
+ */
+public class TestBuilders {
+    /** Returns a builder of the test cluster object. */
+    public static ClusterBuilder cluster() {
+        return new ClusterBuilderImpl();
+    }
+
+    /** Returns a builder of the test table object. */
+    public static TableBuilder table() {
+        return new TableBuilderImpl();
+    }
+
+    /**
+     * A builder to create a test cluster object.
+     *
+     * @see TestCluster
+     */
+    public interface ClusterBuilder {
+        /**
+         * Sets desired names for the cluster nodes.
+         *
+         * @param firstNodeName A name of the first node. There is no difference in what node should be first. This parameter was introduced
+         *     to force user to provide at least one node name.
+         * @param otherNodeNames An array of rest of the names to create cluster from.
+         * @return {@code this} for chaining.
+         */
+        ClusterBuilder nodes(String firstNodeName, String... otherNodeNames);
+
+        /**
+         * Creates a table builder to add to the cluster.
+         *
+         * @return An instance of table builder.
+         */
+        ClusterTableBuilder addTable();
+
+        /**
+         * Builds the cluster object.
+         *
+         * @return Created cluster object.
+         */
+        TestCluster build();
+    }
+
+    /**
+     * A builder to create a test table object.
+     *
+     * @see TestTable
+     */
+    public interface TableBuilder extends TableBuilderBase<TableBuilder> {
+        /**
+         * Builds a table.
+         *
+         * @return Created table object.
+         */
+        public TestTable build();
+    }
+
+    /**
+     * A builder to create a test table as nested object of the cluster.
+     *
+     * @see TestTable
+     * @see TestCluster
+     */
+    public interface ClusterTableBuilder extends TableBuilderBase<ClusterTableBuilder>, NestedBuilder<ClusterBuilder> {
+        /**
+         * Adds a default data provider, which will be used for those nodes for which no specific provider is specified.
+         *
+         * <p>Note: this method will force all nodes in the cluster to have a data provider for the given table.
+         */
+        ClusterTableBuilder defaultDataProvider(DataProvider<?> dataProvider);
+    }
+
+    private static class ClusterBuilderImpl implements ClusterBuilder {
+        private final List<ClusterTableBuilderImpl> tableBuilders = new ArrayList<>();
+        private List<String> nodeNames;
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterBuilder nodes(String firstNodeName, String... otherNodeNames) {
+            this.nodeNames = new ArrayList<>();
+
+            nodeNames.add(firstNodeName);
+            nodeNames.addAll(Arrays.asList(otherNodeNames));
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterTableBuilder addTable() {
+            return new ClusterTableBuilderImpl(this);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public TestCluster build() {
+            var clusterService = new TestClusterService(nodeNames);
+
+            for (ClusterTableBuilderImpl tableBuilder : tableBuilders) {
+                validateTableBuilder(tableBuilder);
+                injectDataProvidersIfNeeded(tableBuilder);
+            }
+
+            Map<String, Table> tableMap = tableBuilders.stream()
+                    .map(ClusterTableBuilderImpl::build)
+                    .collect(Collectors.toMap(TestTable::name, Function.identity()));
+
+            var schemaManager = new PredefinedSchemaManager(new IgniteSchema("PUBLIC", tableMap, null));
+
+            Map<String, TestNode> nodes = nodeNames.stream()
+                    .map(name -> new TestNode(name, clusterService.spawnForNode(name), schemaManager))
+                    .collect(Collectors.toMap(TestNode::name, Function.identity()));
+
+            return new TestCluster(nodes);
+        }
+
+        private void validateTableBuilder(ClusterTableBuilderImpl tableBuilder) {
+            Set<String> tableOwners = new HashSet<>(tableBuilder.dataProviders.keySet());
+
+            tableOwners.removeAll(nodeNames);
+
+            if (!tableOwners.isEmpty()) {
+                throw new AssertionError(format("The table has a dataProvider that is outside the cluster "
+                        + "[tableName={}, outsiders={}]", tableBuilder.name, tableOwners));
+            }
+        }
+
+        private void injectDataProvidersIfNeeded(ClusterTableBuilderImpl tableBuilder) {
+            if (tableBuilder.defaultDataProvider == null) {
+                return;
+            }
+
+            Set<String> nodesWithoutDataProvider = new HashSet<>(nodeNames);
+
+            nodesWithoutDataProvider.removeAll(tableBuilder.dataProviders.keySet());
+
+            for (String name : nodesWithoutDataProvider) {
+                tableBuilder.addDataProvider(name, tableBuilder.defaultDataProvider);
+            }
+        }
+    }
+
+    private static class TableBuilderImpl extends AbstractTableBuilderImpl<TableBuilder> implements TableBuilder {
+        /** {@inheritDoc} */
+        @Override
+        public TestTable build() {
+            return new TestTable(
+                    new TableDescriptorImpl(columns, distribution), name, dataProviders, size
+            );
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected TableBuilder self() {
+            return this;
+        }
+    }
+
+    private static class ClusterTableBuilderImpl extends AbstractTableBuilderImpl<ClusterTableBuilder> implements ClusterTableBuilder {
+        private final ClusterBuilderImpl parent;
+
+        private DataProvider<?> defaultDataProvider = null;
+
+        private ClusterTableBuilderImpl(ClusterBuilderImpl parent) {
+            this.parent = parent;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected ClusterTableBuilder self() {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterTableBuilder defaultDataProvider(DataProvider<?> dataProvider) {
+            this.defaultDataProvider = dataProvider;
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterBuilder end() {
+            parent.tableBuilders.add(this);
+
+            return parent;
+        }
+
+        private TestTable build() {
+            return new TestTable(
+                    new TableDescriptorImpl(columns, distribution), name, dataProviders, size
+            );
+        }
+    }
+
+    private abstract static class AbstractTableBuilderImpl<ChildT> implements TableBuilderBase<ChildT> {
+        protected final List<ColumnDescriptor> columns = new ArrayList<>();
+        protected final Map<String, DataProvider<?>> dataProviders = new HashMap<>();
+
+        protected String name;
+        protected IgniteDistribution distribution;
+        protected int size = 100_000;
+
+        protected abstract ChildT self();
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT name(String name) {
+            this.name = name;
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT distribution(IgniteDistribution distribution) {
+            this.distribution = distribution;
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT addColumn(String name, NativeType type) {
+            columns.add(new ColumnDescriptorImpl(
+                    name, false, true, columns.size(), columns.size(), type, DefaultValueStrategy.DEFAULT_NULL, null
+            ));
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT addDataProvider(String targetNode, DataProvider<?> dataProvider) {
+            this.dataProviders.put(targetNode, dataProvider);
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT size(int size) {
+            this.size = size;
+
+            return self();
+        }
+    }
+
+    /**
+     * Base interface describing the complete set of table-related fields.
+     *
+     * <p>The sole purpose of this interface is to keep in sync both variants of table's builders.
+     *
+     * @param <ChildT> An actual type of builder that should be exposed to the user.
+     * @see ClusterTableBuilder
+     * @see TableBuilder
+     */
+    private interface TableBuilderBase<ChildT> {
+        /** Sets the name of the table. */
+        ChildT name(String name);
+
+        /** Sets the distribution of the table. */
+        ChildT distribution(IgniteDistribution distribution);
+
+        /** Adds a column to the table. */
+        ChildT addColumn(String name, NativeType type);
+
+        /** Adds a data provider for the given node to the table. */
+        ChildT addDataProvider(String targetNode, DataProvider<?> dataProvider);
+
+        /** Sets the size of the table. */
+        ChildT size(int size);
+    }
+
+    /**
+     * This interfaces provides a nested builder with ability to return on the previous layer.
+     *
+     * <p>For example:</p>
+     * <pre>
+     *     interface ChildBuilder implements NestedBuilder&lt;ParentBuilder&gt; {
+     *         ChildBuilder nestedFoo();
+     *     }
+     *
+     *     interface ParentBuilder {
+     *         ParentBuilder foo();
+     *         ParentBuilder bar();
+     *         ChildBuilder child();
+     *     }
+     *
+     *     Builders.parent()
+     *         .foo()
+     *         .child() // now we are dealing with the ChildBuilder
+     *             .nestedFoo()
+     *             .end() // and here we are returning back to the ParentBuilder
+     *         .bar()
+     *         .build()
+     * </pre>
+     */
+    @FunctionalInterface
+    private interface NestedBuilder<ParentT> {
+        /**
+         * Notifies the builder's chain of the nested builder that we need to return back to the
+         * previous layer.
+         *
+         * @return An instance of the parent builder.
+         */
+        ParentT end();
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
new file mode 100644
index 0000000000..9a660217be
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * A test cluster object.
+ *
+ * <p>This is convenient holder of collection of nodes which provides methods for centralised
+ * accessing and management.
+ *
+ * <p>NB: do not forget to {@link #start()} cluster before use, and {@link #stop()} the cluster after.
+ */
+public class TestCluster implements LifecycleAware {
+    private final Map<String, TestNode> nodeByName;
+
+    TestCluster(Map<String, TestNode> nodeByName) {
+        this.nodeByName = nodeByName;
+    }
+
+    /**
+     * Returns the node for the given name, if exists.
+     *
+     * @param name A name of the node of interest.
+     * @return A test node or {@code null} if there is no node with such name.
+     */
+    public TestNode node(String name) {
+        return nodeByName.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        nodeByName.values().forEach(TestNode::start);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        List<AutoCloseable> closeables = nodeByName.values().stream()
+                .map(node -> ((AutoCloseable) node::stop))
+                .collect(Collectors.toList());
+
+        IgniteUtils.closeAll(closeables);
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
new file mode 100644
index 0000000000..a3cf08296d
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.AbstractMessagingService;
+import org.apache.ignite.network.AbstractTopologyService;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.NodeMetadata;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Auxiliary object to create the associated {@link MessagingService}
+ * and {@link TopologyService} for each node in the cluster.
+ */
+class TestClusterService {
+    private final List<String> allNodes;
+
+    private final Map<String, LocalMessagingService> messagingServicesByNode = new ConcurrentHashMap<>();
+    private final Map<String, LocalTopologyService> topologyServicesByNode = new ConcurrentHashMap<>();
+
+    /**
+     * Creates a cluster service object for given collection of nodes.
+     *
+     * @param allNodes A collection of nodes to create cluster service from.
+     */
+    TestClusterService(List<String> allNodes) {
+        this.allNodes = allNodes;
+    }
+
+    ClusterService spawnForNode(String nodeName) {
+        return new ClusterService() {
+            /** {@inheritDoc} */
+            @Override
+            public TopologyService topologyService() {
+                return topologyServicesByNode.computeIfAbsent(nodeName, name -> new LocalTopologyService(name, allNodes));
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public MessagingService messagingService() {
+                return messagingServicesByNode.computeIfAbsent(nodeName, LocalMessagingService::new);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public ClusterLocalConfiguration localConfiguration() {
+                throw new AssertionError("Should not be called");
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean isStopped() {
+                return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void updateMetadata(NodeMetadata metadata) {
+                throw new AssertionError("Should not be called");
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void start() {
+
+            }
+        };
+    }
+
+    private static class LocalTopologyService extends AbstractTopologyService {
+        private static final AtomicInteger NODE_COUNTER = new AtomicInteger(1);
+
+        private final ClusterNode localMember;
+        private final Map<String, ClusterNode> allMembers;
+        private final Map<NetworkAddress, ClusterNode> allMembersByAddress;
+
+        private LocalTopologyService(String localMember, List<String> allMembers) {
+            this.allMembers = allMembers.stream()
+                    .map(LocalTopologyService::nodeFromName)
+                    .collect(Collectors.toMap(ClusterNode::name, Function.identity()));
+
+            this.localMember = this.allMembers.get(localMember);
+
+            if (this.localMember == null) {
+                throw new IllegalArgumentException("Local member is not part of all members");
+            }
+
+            this.allMembersByAddress = new HashMap<>();
+
+            this.allMembers.forEach((ignored, member) -> allMembersByAddress.put(member.address(), member));
+        }
+
+        private static ClusterNode nodeFromName(String name) {
+            return new ClusterNode(name, name, NetworkAddress.from("127.0.0.1:" + NODE_COUNTER.incrementAndGet()));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterNode localMember() {
+            return localMember;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Collection<ClusterNode> allMembers() {
+            return allMembers.values();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public @Nullable ClusterNode getByAddress(NetworkAddress addr) {
+            return allMembersByAddress.get(addr);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public @Nullable ClusterNode getByConsistentId(String consistentId) {
+            return allMembers.get(consistentId);
+        }
+    }
+
+    private class LocalMessagingService extends AbstractMessagingService {
+        private final String localNodeName;
+
+        private LocalMessagingService(String localNodeName) {
+            this.localNodeName = localNodeName;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+            for (var handler : messagingServicesByNode.get(recipient.name()).messageHandlers(msg.groupType())) {
+                handler.onReceived(msg, localNodeName, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        private Collection<NetworkMessageHandler> messageHandlers(short groupType) {
+            return getMessageHandlers(groupType);
+        }
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
new file mode 100644
index 0000000000..cba801716e
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.rel.Node;
+import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * An object representing a node in test cluster.
+ *
+ * <p>Provides convenient access to the methods for optimization and execution of the queries.
+ */
+public class TestNode implements LifecycleAware {
+    private final String nodeName;
+    private final SchemaPlus schema;
+    private final PrepareService prepareService;
+    private final ExecutionService executionService;
+
+    private final List<LifecycleAware> services = new ArrayList<>();
+
+    /**
+     * Constructs the object.
+     *
+     * @param nodeName A name of the node to create.
+     * @param clusterService A cluster service.
+     * @param schemaManager A schema manager to use for query planning and execution.
+     */
+    TestNode(
+            String nodeName,
+            ClusterService clusterService,
+            SqlSchemaManager schemaManager
+    ) {
+        this.nodeName = nodeName;
+        this.prepareService = registerService(new PrepareServiceImpl(nodeName, 0, mock(DdlSqlToCommandConverter.class)));
+        this.schema = schemaManager.schema("PUBLIC");
+
+        TopologyService topologyService = clusterService.topologyService();
+        MessagingService messagingService = clusterService.messagingService();
+        RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
+
+        MailboxRegistry mailboxRegistry = registerService(new MailboxRegistryImpl());
+        QueryTaskExecutor taskExecutor = registerService(new QueryTaskExecutorImpl(nodeName));
+
+        MessageService messageService = registerService(new MessageServiceImpl(
+                topologyService, messagingService, taskExecutor, new IgniteSpinBusyLock()
+        ));
+        ExchangeService exchangeService = registerService(new ExchangeServiceImpl(
+                topologyService.localMember(), taskExecutor, mailboxRegistry, messageService
+        ));
+
+        executionService = registerService(new ExecutionServiceImpl<>(
+                topologyService.localMember(),
+                messageService,
+                new MappingServiceImpl(topologyService),
+                schemaManager,
+                mock(DdlCommandHandler.class),
+                taskExecutor,
+                rowHandler,
+                exchangeService,
+                ctx -> new LogicalRelImplementor<Object[]>(
+                        ctx,
+                        new HashFunctionFactoryImpl<>(schemaManager, rowHandler),
+                        mailboxRegistry,
+                        exchangeService
+                ) {
+                    @Override
+                    public Node<Object[]> visit(IgniteTableScan rel) {
+                        DataProvider<Object[]> dataProvider = rel.getTable().unwrap(TestTable.class).dataProvider(ctx.localNode().name());
+
+                        return new ScanNode<>(ctx, dataProvider);
+                    }
+                }
+        ));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        services.forEach(LifecycleAware::start);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        List<AutoCloseable> closeables = services.stream()
+                .map(service -> ((AutoCloseable) service::stop))
+                .collect(Collectors.toList());
+
+        Collections.reverse(closeables);
+        IgniteUtils.closeAll(closeables);
+    }
+
+    /** Returns the name of the current node. */
+    public String name() {
+        return nodeName;
+    }
+
+    /**
+     * Executes given plan on a cluster this node belongs to
+     * and returns an async cursor representing the result.
+     *
+     * @param plan A plan to execute.
+     * @return A cursor representing the result.
+     */
+    public AsyncCursor<List<Object>> executePlan(QueryPlan plan) {
+        return executionService.executePlan(plan, createContext());
+    }
+
+    /**
+     * Prepares (aka parses, validates, and optimizes) the given query string
+     * and returns the plan to execute.
+     *
+     * @param query A query string to prepare.
+     * @return A plan to execute.
+     */
+    public QueryPlan prepare(String query) {
+        SqlNodeList nodes = Commons.parse(query, FRAMEWORK_CONFIG.getParserConfig());
+
+        assertThat(nodes, hasSize(1));
+
+        return prepareService.prepareAsync(nodes.get(0), createContext()).join();
+    }
+
+    private BaseQueryContext createContext() {
+        return BaseQueryContext.builder()
+                .cancel(new QueryCancel())
+                .frameworkConfig(
+                        Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                .defaultSchema(schema)
+                                .build()
+                )
+                .build();
+    }
+
+    private <T extends LifecycleAware> T registerService(T service) {
+        services.add(service);
+
+        return service;
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
new file mode 100644
index 0000000000..baa194c5bd
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
+import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.InternalTable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test table that implements all the necessary for the optimizer methods to be used
+ * to prepare a query, as well as provides access to the data to use this table in
+ * execution-related scenarios.
+ */
+public class TestTable implements InternalIgniteTable {
+    private static final String DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE =
+            "DataProvider is not configured [table={}, node={}]";
+
+    private final UUID id = UUID.randomUUID();
+    private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+    private final String name;
+    private final double rowCnt;
+    private final ColocationGroup colocationGroup;
+    private final TableDescriptor descriptor;
+    private final Map<String, DataProvider<?>> dataProviders;
+
+
+    /** Constructor. */
+    public TestTable(
+            TableDescriptor descriptor,
+            String name,
+            ColocationGroup colocationGroup,
+            double rowCnt
+    ) {
+        this.descriptor = descriptor;
+        this.name = name;
+        this.rowCnt = rowCnt;
+        this.colocationGroup = colocationGroup;
+
+        dataProviders = Collections.emptyMap();
+    }
+
+    /** Constructor. */
+    public TestTable(
+            TableDescriptor descriptor,
+            String name,
+            Map<String, DataProvider<?>> dataProviders,
+            double rowCnt
+    ) {
+        this.descriptor = descriptor;
+        this.name = name;
+        this.rowCnt = rowCnt;
+        this.dataProviders = dataProviders;
+
+        this.colocationGroup = ColocationGroup.forNodes(List.copyOf(dataProviders.keySet()));
+    }
+
+    /**
+     * Returns the data provider for the given node.
+     *
+     * @param nodeName Name of the node of interest.
+     * @param <RowT> A type of the rows the data provider should produce.
+     * @return A data provider for the node of interest.
+     * @throws AssertionError in case data provider is not configured for the given node.
+     */
+    <RowT> DataProvider<RowT> dataProvider(String nodeName) {
+        if (!dataProviders.containsKey(nodeName)) {
+            throw new AssertionError(format(DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE, name, nodeName));
+        }
+
+        return (DataProvider<RowT>) dataProviders.get(nodeName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int version() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteLogicalTableScan toRel(
+            RelOptCluster cluster,
+            RelOptTable relOptTbl,
+            List<RelHint> hints,
+            @Nullable List<RexNode> proj,
+            @Nullable RexNode cond,
+            @Nullable ImmutableBitSet requiredColumns
+    ) {
+        return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), hints, relOptTbl, proj, cond, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteLogicalIndexScan toRel(
+            RelOptCluster cluster,
+            RelOptTable relOptTbl,
+            String idxName,
+            @Nullable List<RexNode> proj,
+            @Nullable RexNode cond,
+            @Nullable ImmutableBitSet requiredColumns
+    ) {
+        return IgniteLogicalIndexScan.create(cluster, cluster.traitSet(), relOptTbl, idxName, proj, cond, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
+        return descriptor.rowType((IgniteTypeFactory) typeFactory, bitSet);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Statistic getStatistic() {
+        return new Statistic() {
+            /** {@inheritDoc} */
+            @Override
+            public Double getRowCount() {
+                return rowCnt;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean isKey(ImmutableBitSet cols) {
+                return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public List<ImmutableBitSet> getKeys() {
+                throw new AssertionError();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public List<RelReferentialConstraint> getReferentialConstraints() {
+                throw new AssertionError();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public List<RelCollation> getCollations() {
+                return Collections.emptyList();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RelDistribution getDistribution() {
+                throw new AssertionError();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Schema.TableType getJdbcTableType() {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isRolledUp(String col) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean rolledUpColumnValidInsideAgg(
+            String column,
+            SqlCall call,
+            SqlNode parent,
+            CalciteConnectionConfig config
+    ) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ColocationGroup colocationGroup(MappingQueryContext ctx) {
+        return colocationGroup;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteDistribution distribution() {
+        return descriptor.distribution();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableDescriptor descriptor() {
+        return descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Map<String, IgniteIndex> indexes() {
+        return Collections.unmodifiableMap(indexes);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addIndex(IgniteIndex idxTbl) {
+        indexes.put(idxTbl.name(), idxTbl);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteIndex getIndex(String idxName) {
+        return indexes.get(idxName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeIndex(String idxName) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public InternalTable table() {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row, RowFactory<RowT> factory,
+            @Nullable BitSet requiredColumns) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> ModifyRow toModifyRow(ExecutionContext<RowT> ectx, RowT row, Operation op, @Nullable List<String> arg) {
+        throw new AssertionError();
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 9d6fb2e7b1..3b4040bcd6 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -42,7 +42,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -95,6 +94,7 @@ import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
+import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.prepare.Cloner;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -112,10 +112,8 @@ import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -666,17 +664,8 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         List<RelNode> deserializedNodes = new ArrayList<>();
 
-        Map<UUID, IgniteTable> tableMap = new HashMap<>();
-
-        for (IgniteSchema schema : schemas) {
-            tableMap.putAll(schema.getTableNames().stream()
-                    .map(schema::getTable)
-                    .map(IgniteTable.class::cast)
-                    .collect(Collectors.toMap(IgniteTable::id, Function.identity())));
-        }
-
         for (String s : serialized) {
-            RelJsonReader reader = new RelJsonReader(new SqlSchemaManagerImpl(tableMap));
+            RelJsonReader reader = new RelJsonReader(new PredefinedSchemaManager(schemas));
             deserializedNodes.add(reader.read(s));
         }
 
@@ -1149,24 +1138,6 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
         }
     }
 
-    static class SqlSchemaManagerImpl implements SqlSchemaManager {
-        private final Map<UUID, IgniteTable> tablesById;
-
-        public SqlSchemaManagerImpl(Map<UUID, IgniteTable> tablesById) {
-            this.tablesById = tablesById;
-        }
-
-        @Override
-        public SchemaPlus schema(@Nullable String schema) {
-            throw new AssertionError();
-        }
-
-        @Override
-        public IgniteTable tableById(UUID id, int ver) {
-            return tablesById.get(id);
-        }
-    }
-
     static class TestSortedIndex implements SortedIndex {
         private final UUID id = UUID.randomUUID();