You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2023/01/11 06:58:09 UTC
[ignite-3] branch main updated: IGNITE-17275 Sql. Performance testing of pure SQL execution (#1483)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cb7cade340 IGNITE-17275 Sql. Performance testing of pure SQL execution (#1483)
cb7cade340 is described below
commit cb7cade340edd971ba7c8924fc8e76a0240d9db4
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Wed Jan 11 08:58:03 2023 +0200
IGNITE-17275 Sql. Performance testing of pure SQL execution (#1483)
---
.../internal/testframework/IgniteTestUtils.java | 2 +-
modules/sql-engine/build.gradle | 2 +
modules/sql-engine/pom.xml | 25 ++
.../internal/sql/engine/SqlQueryProcessor.java | 10 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 28 +-
.../sql/engine/schema/ColumnDescriptorImpl.java | 3 +-
.../internal/sql/engine/schema/IgniteSchema.java | 21 +-
.../sql/engine/benchmarks/SqlBenchmark.java | 150 +++++++++
.../sql/engine/exec/ExecutionServiceImplTest.java | 40 +--
.../sql/engine/framework/DataProvider.java | 43 +++
.../engine/framework/PredefinedSchemaManager.java | 80 +++++
.../sql/engine/framework/TestBuilders.java | 351 +++++++++++++++++++++
.../internal/sql/engine/framework/TestCluster.java | 66 ++++
.../sql/engine/framework/TestClusterService.java | 198 ++++++++++++
.../internal/sql/engine/framework/TestNode.java | 199 ++++++++++++
.../internal/sql/engine/framework/TestTable.java | 300 ++++++++++++++++++
.../sql/engine/planner/AbstractPlannerTest.java | 33 +-
17 files changed, 1473 insertions(+), 78 deletions(-)
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index a7a700a0ed..aae2063c3d 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -58,7 +58,7 @@ import org.junit.jupiter.api.TestInfo;
public final class IgniteTestUtils {
private static final IgniteLogger LOG = Loggers.forClass(IgniteTestUtils.class);
- private static final int TIMEOUT_SEC = 5000;
+ private static final int TIMEOUT_SEC = 30;
/**
* Set object field value via reflection.
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 30999cac1e..117e58d885 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -54,6 +54,7 @@ dependencies {
annotationProcessor libs.value.annotation.processor
testAnnotationProcessor project(':ignite-network-annotation-processor')
+ testAnnotationProcessor libs.jmh.annotation.processor
testAnnotationProcessor libs.value.annotation.processor
testImplementation project(':ignite-core')
testImplementation project(':ignite-baseline')
@@ -65,6 +66,7 @@ dependencies {
testImplementation project(':ignite-storage-rocksdb')
testImplementation project(':ignite-cluster-management')
testImplementation project(':ignite-vault')
+ testImplementation libs.jmh.core
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-storage-api')))
diff --git a/modules/sql-engine/pom.xml b/modules/sql-engine/pom.xml
index 6ad7ca90dc..4066df87cf 100644
--- a/modules/sql-engine/pom.xml
+++ b/modules/sql-engine/pom.xml
@@ -208,6 +208,19 @@
<artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- Benchmark dependencies -->
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -313,6 +326,12 @@
<artifactId>value</artifactId>
<version>${immutables.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.framework.version}</version>
+ </dependency>
</dependencies>
<configuration>
<!-- <compilerArgs>
@@ -330,6 +349,12 @@
<artifactId>value</artifactId>
<version>${immutables.version}</version>
</path>
+
+ <path>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.framework.version}</version>
+ </path>
</annotationProcessorPaths>
</configuration>
</plugin>
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0ce7773d9e..eecaf29519 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -210,18 +211,17 @@ public class SqlQueryProcessor implements QueryProcessor {
this.prepareSvc = prepareSvc;
+ var ddlCommandHandler = new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager);
+
var executionSrvc = registerService(ExecutionServiceImpl.create(
clusterSrvc.topologyService(),
msgSrvc,
sqlSchemaManager,
- distributionZoneManager,
- tableManager,
- indexManager,
+ ddlCommandHandler,
taskExecutor,
ArrayRowHandler.INSTANCE,
mailboxRegistry,
- exchangeService,
- dataStorageManager
+ exchangeService
));
clusterSrvc.topologyService().addEventHandler(executionSrvc);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 37a243efce..9b4d7027eb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -39,9 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
@@ -72,8 +70,6 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -117,13 +113,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
* @param topSrvc Topology service.
* @param msgSrvc Message service.
* @param sqlSchemaManager Schema manager.
- * @param indexManager Index manager.
- * @param tblManager Table manager.
+ * @param ddlCommandHandler Handler of the DDL commands.
* @param taskExecutor Task executor.
* @param handler Row handler.
* @param mailboxRegistry Mailbox registry.
* @param exchangeSrvc Exchange service.
- * @param dataStorageManager Storage manager.
* @param <RowT> Type of the sql row.
* @return An execution service.
*/
@@ -131,21 +125,18 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
TopologyService topSrvc,
MessageService msgSrvc,
SqlSchemaManager sqlSchemaManager,
- DistributionZoneManager distributionZoneManager,
- TableManager tblManager,
- IndexManager indexManager,
+ DdlCommandHandler ddlCommandHandler,
QueryTaskExecutor taskExecutor,
RowHandler<RowT> handler,
MailboxRegistry mailboxRegistry,
- ExchangeService exchangeSrvc,
- DataStorageManager dataStorageManager
+ ExchangeService exchangeSrvc
) {
return new ExecutionServiceImpl<>(
topSrvc.localMember(),
msgSrvc,
new MappingServiceImpl(topSrvc),
sqlSchemaManager,
- new DdlCommandHandler(distributionZoneManager, tblManager, indexManager, dataStorageManager),
+ ddlCommandHandler,
taskExecutor,
handler,
exchangeSrvc,
@@ -161,7 +152,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
/**
* Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- ExecutionServiceImpl(
+ public ExecutionServiceImpl(
ClusterNode localNode,
MessageService msgSrvc,
MappingService mappingSrvc,
@@ -718,8 +709,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
}
}
+ /**
+ * A factory of the relational node implementors.
+ *
+ * @param <RowT> A type of the row the execution tree will be working with.
+ * @see LogicalRelImplementor
+ */
@FunctionalInterface
- interface ImplementorFactory<RowT> {
+ public interface ImplementorFactory<RowT> {
+ /** Creates the relational node implementor with the given context. */
LogicalRelImplementor<RowT> create(ExecutionContext<RowT> ctx);
}
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
index 76f56d23a3..2dcc455ecb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.schema;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.ignite.internal.schema.NativeType;
+import org.jetbrains.annotations.Nullable;
/**
* Simple implementation of {@link ColumnDescriptor}.
@@ -65,7 +66,7 @@ public class ColumnDescriptorImpl implements ColumnDescriptor {
int physicalIndex,
NativeType type,
DefaultValueStrategy defaultStrategy,
- Supplier<Object> dfltVal
+ @Nullable Supplier<Object> dfltVal
) {
this.key = key;
this.nullable = nullable;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 52aca0781d..f989c9bfeb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite schema.
@@ -35,20 +36,26 @@ public class IgniteSchema extends AbstractSchema {
private final Map<UUID, IgniteIndex> idxMap;
/**
- * Creates a Schema.
+ * Creates a Schema with given tables and indexes.
*
- * @param schemaName Schema name.
+ * @param schemaName A name of the schema to create.
+ * @param tableMap A collection of a tables belonging to the schema.
+ * @param indexMap A collection of an indexes belonging to the schema.
*/
- public IgniteSchema(String schemaName, Map<String, Table> tblMap, Map<UUID, IgniteIndex> idxMap) {
+ public IgniteSchema(
+ String schemaName,
+ @Nullable Map<String, Table> tableMap,
+ @Nullable Map<UUID, IgniteIndex> indexMap
+ ) {
this.schemaName = schemaName;
- this.tblMap = tblMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(tblMap);
- this.idxMap = idxMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(idxMap);
+ this.tblMap = tableMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(tableMap);
+ this.idxMap = indexMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(indexMap);
}
/**
- * Creates a Schema.
+ * Creates an empty Schema.
*
- * @param schemaName Schema name.
+ * @param schemaName A name of the schema to create.
*/
public IgniteSchema(String schemaName) {
this(schemaName, null, null);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
new file mode 100644
index 0000000000..7a5d239c77
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.benchmarks;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * A micro-benchmark of sql execution.
+ */
+@Warmup(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+public class SqlBenchmark {
+ private final DataProvider<Object[]> dataProvider = new SameRowDataProvider(
+ new Object[] {42, UUID.randomUUID().toString()}, 333
+ );
+
+ // @formatter:off
+ private final TestCluster cluster = TestBuilders.cluster()
+ .nodes("N1", "N2", "N3")
+ .addTable()
+ .name("T1")
+ .distribution(IgniteDistributions.hash(List.of(0)))
+ .addColumn("ID", NativeTypes.INT32)
+ .addColumn("VAL", NativeTypes.stringOf(64))
+ .defaultDataProvider(dataProvider)
+ .end()
+ .build();
+ // @formatter:on
+
+ private final TestNode gatewayNode = cluster.node("N1");
+
+ private QueryPlan plan;
+
+ /** Starts the cluster and prepares the plan of the query. */
+ @Setup
+ public void setUp() {
+ cluster.start();
+
+ plan = gatewayNode.prepare("SELECT * FROM t1");
+ }
+
+ /** Stops the cluster. */
+ @TearDown
+ public void tearDown() throws Exception {
+ cluster.stop();
+ }
+
+ /** Very simple test to measure performance of minimal possible distributed query. */
+ @Benchmark
+ public void selectAllSimple(Blackhole bh) {
+ for (var row : await(gatewayNode.executePlan(plan).requestNextAsync(1_000)).items()) {
+ bh.consume(row);
+ }
+ }
+
+ /**
+ * Runs the benchmark.
+ *
+ * @param args args
+ * @throws Exception if something goes wrong
+ */
+ public static void main(String[] args) throws Exception {
+ Options build = new OptionsBuilder()
+ //.addProfiler("gc")
+ .include(SqlBenchmark.class.getName())
+ .build();
+
+ new Runner(build).run();
+ }
+
+ private static class SameRowDataProvider implements DataProvider<Object[]> {
+ private final int times;
+ private final Object[] row;
+
+ SameRowDataProvider(Object[] row, int times) {
+ this.times = times;
+ this.row = row;
+ }
+
+ @Override
+ public Iterator<Object[]> iterator() {
+ return new Iterator<>() {
+ private int counter;
+
+ @Override
+ public boolean hasNext() {
+ return counter < times;
+ }
+
+ @Override
+ public Object[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ counter++;
+
+ return row;
+ }
+ };
+ }
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 92e963db86..49771678a9 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.exec;
-import static org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER;
import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -35,6 +34,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -45,18 +45,20 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.Node;
import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.framework.TestTable;
import org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
import org.apache.ignite.internal.sql.engine.message.MessageListener;
import org.apache.ignite.internal.sql.engine.message.MessageService;
@@ -64,14 +66,16 @@ import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
-import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable;
-import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -108,7 +112,7 @@ public class ExecutionServiceImplTest {
);
private final TestTable table = createTable("TEST_TBL", 1_000_000, IgniteDistributions.random(),
- "ID", Integer.class, "VAL", Integer.class);
+ "ID", NativeTypes.INT32, "VAL", NativeTypes.INT32);
private final IgniteSchema schema = new IgniteSchema("PUBLIC", Map.of(table.name(), table), null);
@@ -588,22 +592,22 @@ public class ExecutionServiceImplTest {
throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
}
- RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(CLUSTER.getTypeFactory());
+ List<ColumnDescriptor> columns = new ArrayList<>();
for (int i = 0; i < fields.length; i += 2) {
- b.add((String) fields[i], CLUSTER.getTypeFactory().createJavaType((Class<?>) fields[i + 1]));
+ columns.add(
+ new ColumnDescriptorImpl(
+ (String) fields[i], false, true, i, i,
+ (NativeType) fields[i + 1], DefaultValueStrategy.DEFAULT_NULL, null
+ )
+ );
}
- return new TestTable(name, b.build(), size) {
- @Override
- public IgniteDistribution distribution() {
- return distr;
- }
-
- @Override
- public ColocationGroup colocationGroup(MappingQueryContext ctx) {
- return ColocationGroup.forNodes(nodeNames);
- }
- };
+ return new TestTable(
+ new TableDescriptorImpl(columns, distr),
+ name,
+ ColocationGroup.forNodes(nodeNames),
+ size
+ );
}
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
new file mode 100644
index 0000000000..a143f4c6ae
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+
+/**
+ * Producer of the rows to use with {@link TestTable} in execution-related scenarios.
+ *
+ * <p>A data provider is supposed to be created for table on per-node basis. It's up
+ * to developer to keep data provider in sync with the schema of the table this data provider relates to.
+ *
+ * @param <T> A type of the produced elements.
+ * @see TestTable
+ */
+@FunctionalInterface
+public interface DataProvider<T> extends Iterable<T> {
+ /**
+ * Creates data provider from given collection.
+ *
+ * @param collection Collection to use as source of data.
+ * @param <T> A type of the produced elements.
+ * @return A data provider instance backed by given collection.
+ */
+ static <T> DataProvider<T> fromCollection(Collection<T> collection) {
+ return collection::iterator;
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
new file mode 100644
index 0000000000..bd9e9bc4ce
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy wrapper for predefined collection of schemas.
+ *
+ * <p>Accepts collection of {@link IgniteSchema schemas} as parameter and implements required
+ * methods of {@link SqlSchemaManager} around them. Assumes given schemas will never be changed.
+ *
+ * @see IgniteSchema
+ * @see SqlSchemaManager
+ */
+public class PredefinedSchemaManager implements SqlSchemaManager {
+ private final SchemaPlus root;
+ private final Map<UUID, IgniteTable> tableById;
+
+ /** Constructs schema manager from a single schema. */
+ public PredefinedSchemaManager(IgniteSchema schema) {
+ this(List.of(schema));
+ }
+
+ /** Constructs schema manager from a collection of schemas. */
+ public PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
+ this.root = Frameworks.createRootSchema(false);
+ this.tableById = new HashMap<>();
+
+ for (IgniteSchema schema : schemas) {
+ root.add(schema.getName(), schema);
+
+ tableById.putAll(
+ schema.getTableNames().stream()
+ .map(schema::getTable)
+ .map(IgniteTable.class::cast)
+ .collect(Collectors.toMap(IgniteTable::id, Function.identity()))
+ );
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SchemaPlus schema(@Nullable String schema) {
+ return schema == null ? root : root.getSubSchema(schema);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteTable tableById(UUID id, int ver) {
+ return tableById.get(id);
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
new file mode 100644
index 0000000000..6a6ae1bb52
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.Table;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+
+/**
+ * A collection of builders to create test objects.
+ */
+public class TestBuilders {
+ /** Returns a builder of the test cluster object. */
+ public static ClusterBuilder cluster() {
+ return new ClusterBuilderImpl();
+ }
+
+ /** Returns a builder of the test table object. */
+ public static TableBuilder table() {
+ return new TableBuilderImpl();
+ }
+
+ /**
+ * A builder to create a test cluster object.
+ *
+ * @see TestCluster
+ */
+ public interface ClusterBuilder {
+ /**
+ * Sets desired names for the cluster nodes.
+ *
+ * @param firstNodeName A name of the first node. There is no difference in what node should be first. This parameter was introduced
+ * to force user to provide at least one node name.
+ * @param otherNodeNames An array of rest of the names to create cluster from.
+ * @return {@code this} for chaining.
+ */
+ ClusterBuilder nodes(String firstNodeName, String... otherNodeNames);
+
+ /**
+ * Creates a table builder to add to the cluster.
+ *
+ * @return An instance of table builder.
+ */
+ ClusterTableBuilder addTable();
+
+ /**
+ * Builds the cluster object.
+ *
+ * @return Created cluster object.
+ */
+ TestCluster build();
+ }
+
+ /**
+ * A builder to create a test table object.
+ *
+ * @see TestTable
+ */
+ public interface TableBuilder extends TableBuilderBase<TableBuilder> {
+ /**
+ * Builds a table.
+ *
+ * @return Created table object.
+ */
+ public TestTable build();
+ }
+
+ /**
+ * A builder to create a test table as nested object of the cluster.
+ *
+ * @see TestTable
+ * @see TestCluster
+ */
+ public interface ClusterTableBuilder extends TableBuilderBase<ClusterTableBuilder>, NestedBuilder<ClusterBuilder> {
+ /**
+ * Adds a default data provider, which will be used for those nodes for which no specific provider is specified.
+ *
+ * <p>Note: this method will force all nodes in the cluster to have a data provider for the given table.
+ */
+ ClusterTableBuilder defaultDataProvider(DataProvider<?> dataProvider);
+ }
+
+ private static class ClusterBuilderImpl implements ClusterBuilder {
+ private final List<ClusterTableBuilderImpl> tableBuilders = new ArrayList<>();
+ private List<String> nodeNames;
+
+ /** {@inheritDoc} */
+ @Override
+ public ClusterBuilder nodes(String firstNodeName, String... otherNodeNames) {
+ this.nodeNames = new ArrayList<>();
+
+ nodeNames.add(firstNodeName);
+ nodeNames.addAll(Arrays.asList(otherNodeNames));
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ClusterTableBuilder addTable() {
+ return new ClusterTableBuilderImpl(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TestCluster build() {
+ var clusterService = new TestClusterService(nodeNames);
+
+ for (ClusterTableBuilderImpl tableBuilder : tableBuilders) {
+ validateTableBuilder(tableBuilder);
+ injectDataProvidersIfNeeded(tableBuilder);
+ }
+
+ Map<String, Table> tableMap = tableBuilders.stream()
+ .map(ClusterTableBuilderImpl::build)
+ .collect(Collectors.toMap(TestTable::name, Function.identity()));
+
+ var schemaManager = new PredefinedSchemaManager(new IgniteSchema("PUBLIC", tableMap, null));
+
+ Map<String, TestNode> nodes = nodeNames.stream()
+ .map(name -> new TestNode(name, clusterService.spawnForNode(name), schemaManager))
+ .collect(Collectors.toMap(TestNode::name, Function.identity()));
+
+ return new TestCluster(nodes);
+ }
+
+ private void validateTableBuilder(ClusterTableBuilderImpl tableBuilder) {
+ Set<String> tableOwners = new HashSet<>(tableBuilder.dataProviders.keySet());
+
+ tableOwners.removeAll(nodeNames);
+
+ if (!tableOwners.isEmpty()) {
+ throw new AssertionError(format("The table has a dataProvider that is outside the cluster "
+ + "[tableName={}, outsiders={}]", tableBuilder.name, tableOwners));
+ }
+ }
+
+ private void injectDataProvidersIfNeeded(ClusterTableBuilderImpl tableBuilder) {
+ if (tableBuilder.defaultDataProvider == null) {
+ return;
+ }
+
+ Set<String> nodesWithoutDataProvider = new HashSet<>(nodeNames);
+
+ nodesWithoutDataProvider.removeAll(tableBuilder.dataProviders.keySet());
+
+ for (String name : nodesWithoutDataProvider) {
+ tableBuilder.addDataProvider(name, tableBuilder.defaultDataProvider);
+ }
+ }
+ }
+
+ private static class TableBuilderImpl extends AbstractTableBuilderImpl<TableBuilder> implements TableBuilder {
+ /** {@inheritDoc} */
+ @Override
+ public TestTable build() {
+ return new TestTable(
+ new TableDescriptorImpl(columns, distribution), name, dataProviders, size
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected TableBuilder self() {
+ return this;
+ }
+ }
+
+ private static class ClusterTableBuilderImpl extends AbstractTableBuilderImpl<ClusterTableBuilder> implements ClusterTableBuilder {
+ private final ClusterBuilderImpl parent;
+
+ private DataProvider<?> defaultDataProvider = null;
+
+ private ClusterTableBuilderImpl(ClusterBuilderImpl parent) {
+ this.parent = parent;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected ClusterTableBuilder self() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ClusterTableBuilder defaultDataProvider(DataProvider<?> dataProvider) {
+ this.defaultDataProvider = dataProvider;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ClusterBuilder end() {
+ parent.tableBuilders.add(this);
+
+ return parent;
+ }
+
+ private TestTable build() {
+ return new TestTable(
+ new TableDescriptorImpl(columns, distribution), name, dataProviders, size
+ );
+ }
+ }
+
+ private abstract static class AbstractTableBuilderImpl<ChildT> implements TableBuilderBase<ChildT> {
+ protected final List<ColumnDescriptor> columns = new ArrayList<>();
+ protected final Map<String, DataProvider<?>> dataProviders = new HashMap<>();
+
+ protected String name;
+ protected IgniteDistribution distribution;
+ protected int size = 100_000;
+
+ protected abstract ChildT self();
+
+ /** {@inheritDoc} */
+ @Override
+ public ChildT name(String name) {
+ this.name = name;
+
+ return self();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChildT distribution(IgniteDistribution distribution) {
+ this.distribution = distribution;
+
+ return self();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChildT addColumn(String name, NativeType type) {
+ columns.add(new ColumnDescriptorImpl(
+ name, false, true, columns.size(), columns.size(), type, DefaultValueStrategy.DEFAULT_NULL, null
+ ));
+
+ return self();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChildT addDataProvider(String targetNode, DataProvider<?> dataProvider) {
+ this.dataProviders.put(targetNode, dataProvider);
+
+ return self();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChildT size(int size) {
+ this.size = size;
+
+ return self();
+ }
+ }
+
+ /**
+ * Base interface describing the complete set of table-related fields.
+ *
+ * <p>The sole purpose of this interface is to keep in sync both variants of table's builders.
+ *
+ * @param <ChildT> An actual type of builder that should be exposed to the user.
+ * @see ClusterTableBuilder
+ * @see TableBuilder
+ */
+ private interface TableBuilderBase<ChildT> {
+ /** Sets the name of the table. */
+ ChildT name(String name);
+
+ /** Sets the distribution of the table. */
+ ChildT distribution(IgniteDistribution distribution);
+
+ /** Adds a column to the table. */
+ ChildT addColumn(String name, NativeType type);
+
+ /** Adds a data provider for the given node to the table. */
+ ChildT addDataProvider(String targetNode, DataProvider<?> dataProvider);
+
+ /** Sets the size of the table. */
+ ChildT size(int size);
+ }
+
+ /**
+ * This interfaces provides a nested builder with ability to return on the previous layer.
+ *
+ * <p>For example:</p>
+ * <pre>
+ * interface ChildBuilder implements NestedBuilder<ParentBuilder> {
+ * ChildBuilder nestedFoo();
+ * }
+ *
+ * interface ParentBuilder {
+ * ParentBuilder foo();
+ * ParentBuilder bar();
+ * ChildBuilder child();
+ * }
+ *
+ * Builders.parent()
+ * .foo()
+ * .child() // now we are dealing with the ChildBuilder
+ * .nestedFoo()
+ * .end() // and here we are returning back to the ParentBuilder
+ * .bar()
+ * .build()
+ * </pre>
+ */
+ @FunctionalInterface
+ private interface NestedBuilder<ParentT> {
+ /**
+ * Notifies the builder's chain of the nested builder that we need to return back to the
+ * previous layer.
+ *
+ * @return An instance of the parent builder.
+ */
+ ParentT end();
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
new file mode 100644
index 0000000000..9a660217be
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * A test cluster object.
+ *
+ * <p>This is convenient holder of collection of nodes which provides methods for centralised
+ * accessing and management.
+ *
+ * <p>NB: do not forget to {@link #start()} cluster before use, and {@link #stop()} the cluster after.
+ */
+public class TestCluster implements LifecycleAware {
+ private final Map<String, TestNode> nodeByName;
+
+ TestCluster(Map<String, TestNode> nodeByName) {
+ this.nodeByName = nodeByName;
+ }
+
+ /**
+ * Returns the node for the given name, if exists.
+ *
+ * @param name A name of the node of interest.
+ * @return A test node or {@code null} if there is no node with such name.
+ */
+ public TestNode node(String name) {
+ return nodeByName.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ nodeByName.values().forEach(TestNode::start);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ List<AutoCloseable> closeables = nodeByName.values().stream()
+ .map(node -> ((AutoCloseable) node::stop))
+ .collect(Collectors.toList());
+
+ IgniteUtils.closeAll(closeables);
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
new file mode 100644
index 0000000000..a3cf08296d
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.AbstractMessagingService;
+import org.apache.ignite.network.AbstractTopologyService;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.NodeMetadata;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Auxiliary object to create the associated {@link MessagingService}
+ * and {@link TopologyService} for each node in the cluster.
+ */
+class TestClusterService {
+ private final List<String> allNodes;
+
+ private final Map<String, LocalMessagingService> messagingServicesByNode = new ConcurrentHashMap<>();
+ private final Map<String, LocalTopologyService> topologyServicesByNode = new ConcurrentHashMap<>();
+
+ /**
+ * Creates a cluster service object for given collection of nodes.
+ *
+ * @param allNodes A collection of nodes to create cluster service from.
+ */
+ TestClusterService(List<String> allNodes) {
+ this.allNodes = allNodes;
+ }
+
+ ClusterService spawnForNode(String nodeName) {
+ return new ClusterService() {
+ /** {@inheritDoc} */
+ @Override
+ public TopologyService topologyService() {
+ return topologyServicesByNode.computeIfAbsent(nodeName, name -> new LocalTopologyService(name, allNodes));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MessagingService messagingService() {
+ return messagingServicesByNode.computeIfAbsent(nodeName, LocalMessagingService::new);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ClusterLocalConfiguration localConfiguration() {
+ throw new AssertionError("Should not be called");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void updateMetadata(NodeMetadata metadata) {
+ throw new AssertionError("Should not be called");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+
+ }
+ };
+ }
+
+ private static class LocalTopologyService extends AbstractTopologyService {
+ private static final AtomicInteger NODE_COUNTER = new AtomicInteger(1);
+
+ private final ClusterNode localMember;
+ private final Map<String, ClusterNode> allMembers;
+ private final Map<NetworkAddress, ClusterNode> allMembersByAddress;
+
+ private LocalTopologyService(String localMember, List<String> allMembers) {
+ this.allMembers = allMembers.stream()
+ .map(LocalTopologyService::nodeFromName)
+ .collect(Collectors.toMap(ClusterNode::name, Function.identity()));
+
+ this.localMember = this.allMembers.get(localMember);
+
+ if (this.localMember == null) {
+ throw new IllegalArgumentException("Local member is not part of all members");
+ }
+
+ this.allMembersByAddress = new HashMap<>();
+
+ this.allMembers.forEach((ignored, member) -> allMembersByAddress.put(member.address(), member));
+ }
+
+ private static ClusterNode nodeFromName(String name) {
+ return new ClusterNode(name, name, NetworkAddress.from("127.0.0.1:" + NODE_COUNTER.incrementAndGet()));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ClusterNode localMember() {
+ return localMember;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<ClusterNode> allMembers() {
+ return allMembers.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable ClusterNode getByAddress(NetworkAddress addr) {
+ return allMembersByAddress.get(addr);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable ClusterNode getByConsistentId(String consistentId) {
+ return allMembers.get(consistentId);
+ }
+ }
+
+ private class LocalMessagingService extends AbstractMessagingService {
+ private final String localNodeName;
+
+ private LocalMessagingService(String localNodeName) {
+ this.localNodeName = localNodeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+ throw new AssertionError("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+ for (var handler : messagingServicesByNode.get(recipient.name()).messageHandlers(msg.groupType())) {
+ handler.onReceived(msg, localNodeName, null);
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) {
+ throw new AssertionError("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) {
+ throw new AssertionError("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+ throw new AssertionError("Not implemented yet");
+ }
+
+ private Collection<NetworkMessageHandler> messageHandlers(short groupType) {
+ return getMessageHandlers(groupType);
+ }
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
new file mode 100644
index 0000000000..cba801716e
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.rel.Node;
+import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * An object representing a node in test cluster.
+ *
+ * <p>Provides convenient access to the methods for optimization and execution of the queries.
+ */
+public class TestNode implements LifecycleAware {
+ private final String nodeName;
+ private final SchemaPlus schema;
+ private final PrepareService prepareService;
+ private final ExecutionService executionService;
+
+ private final List<LifecycleAware> services = new ArrayList<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param nodeName A name of the node to create.
+ * @param clusterService A cluster service.
+ * @param schemaManager A schema manager to use for query planning and execution.
+ */
+ TestNode(
+ String nodeName,
+ ClusterService clusterService,
+ SqlSchemaManager schemaManager
+ ) {
+ this.nodeName = nodeName;
+ this.prepareService = registerService(new PrepareServiceImpl(nodeName, 0, mock(DdlSqlToCommandConverter.class)));
+ this.schema = schemaManager.schema("PUBLIC");
+
+ TopologyService topologyService = clusterService.topologyService();
+ MessagingService messagingService = clusterService.messagingService();
+ RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
+
+ MailboxRegistry mailboxRegistry = registerService(new MailboxRegistryImpl());
+ QueryTaskExecutor taskExecutor = registerService(new QueryTaskExecutorImpl(nodeName));
+
+ MessageService messageService = registerService(new MessageServiceImpl(
+ topologyService, messagingService, taskExecutor, new IgniteSpinBusyLock()
+ ));
+ ExchangeService exchangeService = registerService(new ExchangeServiceImpl(
+ topologyService.localMember(), taskExecutor, mailboxRegistry, messageService
+ ));
+
+ executionService = registerService(new ExecutionServiceImpl<>(
+ topologyService.localMember(),
+ messageService,
+ new MappingServiceImpl(topologyService),
+ schemaManager,
+ mock(DdlCommandHandler.class),
+ taskExecutor,
+ rowHandler,
+ exchangeService,
+ ctx -> new LogicalRelImplementor<Object[]>(
+ ctx,
+ new HashFunctionFactoryImpl<>(schemaManager, rowHandler),
+ mailboxRegistry,
+ exchangeService
+ ) {
+ @Override
+ public Node<Object[]> visit(IgniteTableScan rel) {
+ DataProvider<Object[]> dataProvider = rel.getTable().unwrap(TestTable.class).dataProvider(ctx.localNode().name());
+
+ return new ScanNode<>(ctx, dataProvider);
+ }
+ }
+ ));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ services.forEach(LifecycleAware::start);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ List<AutoCloseable> closeables = services.stream()
+ .map(service -> ((AutoCloseable) service::stop))
+ .collect(Collectors.toList());
+
+ Collections.reverse(closeables);
+ IgniteUtils.closeAll(closeables);
+ }
+
+ /** Returns the name of the current node. */
+ public String name() {
+ return nodeName;
+ }
+
+ /**
+ * Executes given plan on a cluster this node belongs to
+ * and returns an async cursor representing the result.
+ *
+ * @param plan A plan to execute.
+ * @return A cursor representing the result.
+ */
+ public AsyncCursor<List<Object>> executePlan(QueryPlan plan) {
+ return executionService.executePlan(plan, createContext());
+ }
+
+ /**
+ * Prepares (aka parses, validates, and optimizes) the given query string
+ * and returns the plan to execute.
+ *
+ * @param query A query string to prepare.
+ * @return A plan to execute.
+ */
+ public QueryPlan prepare(String query) {
+ SqlNodeList nodes = Commons.parse(query, FRAMEWORK_CONFIG.getParserConfig());
+
+ assertThat(nodes, hasSize(1));
+
+ return prepareService.prepareAsync(nodes.get(0), createContext()).join();
+ }
+
+ private BaseQueryContext createContext() {
+ return BaseQueryContext.builder()
+ .cancel(new QueryCancel())
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build()
+ )
+ .build();
+ }
+
+ private <T extends LifecycleAware> T registerService(T service) {
+ services.add(service);
+
+ return service;
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
new file mode 100644
index 0000000000..baa194c5bd
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
+import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.InternalTable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test table that implements all the necessary for the optimizer methods to be used
+ * to prepare a query, as well as provides access to the data to use this table in
+ * execution-related scenarios.
+ */
+public class TestTable implements InternalIgniteTable {
+ private static final String DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE =
+ "DataProvider is not configured [table={}, node={}]";
+
+ private final UUID id = UUID.randomUUID();
+ private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+ private final String name;
+ private final double rowCnt;
+ private final ColocationGroup colocationGroup;
+ private final TableDescriptor descriptor;
+ private final Map<String, DataProvider<?>> dataProviders;
+
+
+ /** Constructor. */
+ public TestTable(
+ TableDescriptor descriptor,
+ String name,
+ ColocationGroup colocationGroup,
+ double rowCnt
+ ) {
+ this.descriptor = descriptor;
+ this.name = name;
+ this.rowCnt = rowCnt;
+ this.colocationGroup = colocationGroup;
+
+ dataProviders = Collections.emptyMap();
+ }
+
+ /** Constructor. */
+ public TestTable(
+ TableDescriptor descriptor,
+ String name,
+ Map<String, DataProvider<?>> dataProviders,
+ double rowCnt
+ ) {
+ this.descriptor = descriptor;
+ this.name = name;
+ this.rowCnt = rowCnt;
+ this.dataProviders = dataProviders;
+
+ this.colocationGroup = ColocationGroup.forNodes(List.copyOf(dataProviders.keySet()));
+ }
+
+ /**
+ * Returns the data provider for the given node.
+ *
+ * @param nodeName Name of the node of interest.
+ * @param <RowT> A type of the rows the data provider should produce.
+ * @return A data provider for the node of interest.
+ * @throws AssertionError in case data provider is not configured for the given node.
+ */
+ <RowT> DataProvider<RowT> dataProvider(String nodeName) {
+ if (!dataProviders.containsKey(nodeName)) {
+ throw new AssertionError(format(DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE, name, nodeName));
+ }
+
+ return (DataProvider<RowT>) dataProviders.get(nodeName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int version() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteLogicalTableScan toRel(
+ RelOptCluster cluster,
+ RelOptTable relOptTbl,
+ List<RelHint> hints,
+ @Nullable List<RexNode> proj,
+ @Nullable RexNode cond,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), hints, relOptTbl, proj, cond, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteLogicalIndexScan toRel(
+ RelOptCluster cluster,
+ RelOptTable relOptTbl,
+ String idxName,
+ @Nullable List<RexNode> proj,
+ @Nullable RexNode cond,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return IgniteLogicalIndexScan.create(cluster, cluster.traitSet(), relOptTbl, idxName, proj, cond, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
+ return descriptor.rowType((IgniteTypeFactory) typeFactory, bitSet);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Statistic getStatistic() {
+ return new Statistic() {
+ /** {@inheritDoc} */
+ @Override
+ public Double getRowCount() {
+ return rowCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isKey(ImmutableBitSet cols) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<ImmutableBitSet> getKeys() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<RelReferentialConstraint> getReferentialConstraints() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<RelCollation> getCollations() {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelDistribution getDistribution() {
+ throw new AssertionError();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isRolledUp(String col) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean rolledUpColumnValidInsideAgg(
+ String column,
+ SqlCall call,
+ SqlNode parent,
+ CalciteConnectionConfig config
+ ) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ColocationGroup colocationGroup(MappingQueryContext ctx) {
+ return colocationGroup;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteDistribution distribution() {
+ return descriptor.distribution();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableDescriptor descriptor() {
+ return descriptor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Map<String, IgniteIndex> indexes() {
+ return Collections.unmodifiableMap(indexes);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void addIndex(IgniteIndex idxTbl) {
+ indexes.put(idxTbl.name(), idxTbl);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteIndex getIndex(String idxName) {
+ return indexes.get(idxName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void removeIndex(String idxName) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public InternalTable table() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row, RowFactory<RowT> factory,
+ @Nullable BitSet requiredColumns) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> ModifyRow toModifyRow(ExecutionContext<RowT> ectx, RowT row, Operation op, @Nullable List<String> arg) {
+ throw new AssertionError();
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 9d6fb2e7b1..3b4040bcd6 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -42,7 +42,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -95,6 +94,7 @@ import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
+import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
import org.apache.ignite.internal.sql.engine.prepare.Cloner;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -112,10 +112,8 @@ import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -666,17 +664,8 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
List<RelNode> deserializedNodes = new ArrayList<>();
- Map<UUID, IgniteTable> tableMap = new HashMap<>();
-
- for (IgniteSchema schema : schemas) {
- tableMap.putAll(schema.getTableNames().stream()
- .map(schema::getTable)
- .map(IgniteTable.class::cast)
- .collect(Collectors.toMap(IgniteTable::id, Function.identity())));
- }
-
for (String s : serialized) {
- RelJsonReader reader = new RelJsonReader(new SqlSchemaManagerImpl(tableMap));
+ RelJsonReader reader = new RelJsonReader(new PredefinedSchemaManager(schemas));
deserializedNodes.add(reader.read(s));
}
@@ -1149,24 +1138,6 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
}
}
- static class SqlSchemaManagerImpl implements SqlSchemaManager {
- private final Map<UUID, IgniteTable> tablesById;
-
- public SqlSchemaManagerImpl(Map<UUID, IgniteTable> tablesById) {
- this.tablesById = tablesById;
- }
-
- @Override
- public SchemaPlus schema(@Nullable String schema) {
- throw new AssertionError();
- }
-
- @Override
- public IgniteTable tableById(UUID id, int ver) {
- return tablesById.get(id);
- }
- }
-
static class TestSortedIndex implements SortedIndex {
private final UUID id = UUID.randomUUID();