You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/06/13 16:06:22 UTC

[ignite-3] branch ignite-17765-2 created (now 05cf5031dc)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-17765-2
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 05cf5031dc fix

This branch includes the following new commits:

     new 31cabd5923 Merge branch 'main' into ignite-17765-2
     new 05cf5031dc fix

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/02: Merge branch 'main' into ignite-17765-2

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17765-2
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 31cabd5923b5fa8b2992deeb8e3be64b31745f02
Merge: fd9a89dc2a 44c97c0065
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Jun 13 16:45:47 2023 +0300

    Merge branch 'main' into ignite-17765-2

 .../org/apache/ignite/compute/DeploymentUnit.java  |  10 +
 .../org/apache/ignite/compute/IgniteCompute.java   |  77 +--
 .../DistributionZoneAlreadyExistsException.java    |   3 +-
 .../lang}/DistributionZoneBindTableException.java  |   7 +-
 .../lang}/DistributionZoneNotFoundException.java   |   3 +-
 .../apache/ignite/table/DataStreamerOptions.java   | 176 ++++++
 .../apache/ignite/table/DataStreamerTarget.java}   |  34 +-
 .../java/org/apache/ignite/table/KeyValueView.java |   3 +-
 .../java/org/apache/ignite/table/RecordView.java   |   2 +-
 .../apache/ignite/internal/catalog/Catalog.java    |  87 ++-
 .../ignite/internal/catalog/CatalogManager.java    |  36 ++
 .../ignite/internal/catalog/CatalogService.java    |  32 +-
 .../internal/catalog/CatalogServiceImpl.java       | 454 +++++++++++---
 .../commands/AbstractZoneCommandParams.java        |  67 ++
 .../internal/catalog/commands/AlterZoneParams.java | 183 ++++++
 .../internal/catalog/commands/CatalogUtils.java    |  57 +-
 .../catalog/commands/CreateSortedIndexParams.java  |   8 +-
 .../catalog/commands/CreateZoneParams.java         | 191 ++++++
 .../internal/catalog/commands/DropZoneParams.java  |  72 +++
 .../RenameZoneParams.java}                         |  45 +-
 ...nCollation.java => CatalogColumnCollation.java} |   6 +-
 .../descriptors/CatalogDescriptorUtils.java        |  30 +-
 ...riptor.java => CatalogHashIndexDescriptor.java} |   4 +-
 ...ptor.java => CatalogIndexColumnDescriptor.java} |   8 +-
 ...Descriptor.java => CatalogIndexDescriptor.java} |   4 +-
 ...escriptor.java => CatalogObjectDescriptor.java} |   7 +-
 ...escriptor.java => CatalogSchemaDescriptor.java} |  36 +-
 ...ptor.java => CatalogSortedIndexDescriptor.java} |  10 +-
 ...ptor.java => CatalogTableColumnDescriptor.java} |   6 +-
 ...Descriptor.java => CatalogTableDescriptor.java} |  20 +-
 .../catalog/descriptors/CatalogZoneDescriptor.java | 133 ++++
 .../catalog/events/AddColumnEventParameters.java   |   8 +-
 .../catalog/events/AlterColumnEventParameters.java |   8 +-
 ...rameters.java => AlterZoneEventParameters.java} |  20 +-
 .../internal/catalog/events/CatalogEvent.java      |  11 +-
 .../catalog/events/CreateIndexEventParameters.java |   8 +-
 .../catalog/events/CreateTableEventParameters.java |   8 +-
 ...ameters.java => CreateZoneEventParameters.java} |  20 +-
 ...arameters.java => DropZoneEventParameters.java} |  22 +-
 .../internal/catalog/storage/AlterColumnEntry.java |   8 +-
 .../{NewTableEntry.java => AlterZoneEntry.java}    |  18 +-
 .../{NewIndexEntry.java => DropZoneEntry.java}     |  21 +-
 .../internal/catalog/storage/NewColumnsEntry.java  |   8 +-
 .../internal/catalog/storage/NewIndexEntry.java    |   8 +-
 .../internal/catalog/storage/NewTableEntry.java    |   8 +-
 .../{NewTableEntry.java => NewZoneEntry.java}      |  16 +-
 .../internal/catalog/storage/VersionedUpdate.java  |  11 +-
 .../internal/catalog/CatalogServiceSelfTest.java   | 532 ++++++++++++----
 .../catalog/storage/UpdateLogImplTest.java         |   6 +-
 .../ignite/internal/jdbc/JdbcConverterUtils.java}  |  38 +-
 .../internal/jdbc/proto/event/JdbcColumnMeta.java  |  35 +-
 .../client/handler/JdbcQueryCursorHandlerImpl.java |   4 +-
 .../ClientComputeExecuteColocatedRequest.java      |   3 +-
 .../compute/ClientComputeExecuteRequest.java       |   3 +-
 .../handler/requests/jdbc/JdbcMetadataCatalog.java |  52 +-
 .../apache/ignite/client/ClientOperationType.java  |   3 +-
 .../ignite/internal/client/ReliableChannel.java    |  25 +-
 .../ignite/internal/client/TcpClientChannel.java   |  12 +-
 .../internal/client/compute/ClientCompute.java     |  60 +-
 ...AbstractStreamerPartitionAwarenessProvider.java |  60 ++
 .../client/table/ClientKeyValueBinaryView.java     |  33 +-
 .../internal/client/table/ClientKeyValueView.java  |  37 ++
 .../client/table/ClientRecordBinaryView.java       |  31 +
 .../internal/client/table/ClientRecordView.java    |  32 +-
 .../ignite/internal/client/table/ClientTable.java  |  61 +-
 .../client/table/ClientTupleSerializer.java        |  15 +-
 ...luePojoStreamerPartitionAwarenessProvider.java} |  30 +-
 ...ueTupleStreamerPartitionAwarenessProvider.java} |  23 +-
 .../client/table/PartitionAwarenessProvider.java   |  17 +-
 .../PojoStreamerPartitionAwarenessProvider.java}   |  28 +-
 .../client/table/StreamerBatchSender.java}         |  28 +-
 .../internal/client/table/StreamerBuffer.java      |  98 +++
 .../table/StreamerPartitionAwarenessProvider.java} |  30 +-
 .../internal/client/table/StreamerSubscriber.java  | 253 ++++++++
 .../TupleStreamerPartitionAwarenessProvider.java}  |  20 +-
 .../apache/ignite/client/ClientComputeTest.java    |  25 +-
 .../apache/ignite/client/ConsoleLoggerFactory.java | 106 ++++
 .../org/apache/ignite/client/DataStreamerTest.java | 297 +++++++++
 .../ignite/client/PartitionAwarenessTest.java      | 168 ++++-
 .../apache/ignite/client/RequestBalancingTest.java |   2 +-
 .../apache/ignite/client/ServerMetricsTest.java    |  11 +-
 .../apache/ignite/client/fakes/FakeCompute.java    |  51 +-
 .../cluster/management/ItClusterManagerTest.java   |   2 +
 .../internal/deployunit/DeploymentManagerImpl.java |  13 +
 .../internal/deployunit/FileDeployerService.java   |  34 +-
 .../internal/deployunit/IgniteDeployment.java      |  22 +-
 modules/compute/build.gradle                       |   2 +
 .../ignite/internal/compute/ComputeComponent.java  |  28 +-
 .../internal/compute/ComputeComponentImpl.java     |  97 ++-
 .../internal/compute/ComputeMessageTypes.java      |   6 +
 .../ignite/internal/compute/IgniteComputeImpl.java |  96 +--
 .../ignite/internal/compute/JobClassLoader.java    |   8 +
 .../internal/compute/JobClassLoaderFactory.java    |  80 ++-
 ...{ExecuteRequest.java => DeploymentUnitMsg.java} |  40 +-
 .../internal/compute/message/ExecuteRequest.java   |   8 +
 .../internal/compute/ComputeComponentImplTest.java |  75 +--
 .../internal/compute/IgniteComputeImplTest.java    |  44 +-
 .../compute/JobClassLoaderFactoryTest.java         | 152 +++--
 .../ignite/internal/util/CollectionUtils.java      |  23 +
 .../java/org/apache/ignite/lang/ErrorGroups.java   |  10 +-
 .../distributionzones/DistributionZoneManager.java |   6 +-
 .../distributionzones/DistributionZonesUtil.java   |   2 +-
 .../DistributionZoneAwaitDataNodesTest.java        |   2 +-
 .../DistributionZoneManagerTest.java               |   6 +-
 .../apache/ignite/internal/index/IndexManager.java |  48 +-
 modules/jdbc/build.gradle                          |   1 +
 .../jdbc/ItJdbcMetadataSelfTest.java               | 256 +++++---
 .../ignite/internal/jdbc/JdbcDatabaseMetadata.java |  82 +--
 .../apache/ignite/internal/jdbc/JdbcResultSet.java |  44 +-
 .../server/persistence/RocksDbKeyValueStorage.java |  22 +-
 .../server/BasicOperationsKeyValueStorageTest.java |   2 +-
 .../server/TestRocksDbKeyValueStorageTest.java     |  79 +++
 .../server/TestRocksDbKeyValueStorage.java         |  45 ++
 .../internal/network/processor/TypeUtils.java      |  21 +-
 .../messages/MarshallableTypesBlackList.java       |  37 +-
 .../processor/messages/MessageImplGenerator.java   |   6 +-
 .../src/main/resources/marshallable.blacklist      |   4 +
 .../ignite/network/DefaultMessagingService.java    |  20 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   1 +
 .../scalecube/ScaleCubeTopologyService.java        |  10 +-
 .../processor/MarshallableBlacklistTest.java       |  36 +-
 .../processor/TransferableObjectProcessorTest.java |  13 -
 .../network/DefaultMessagingServiceTest.java       |   1 +
 .../scalecube/ScaleCubeTopologyServiceTest.java    |  69 ++
 .../src/test/resources/marshallable.blacklist      |   1 +
 .../persistence/store/FilePageStoreManager.java    |   6 +-
 .../PersistentPageMemoryNoLoadTest.java            |   2 +-
 .../store/FilePageStoreManagerTest.java            |  34 +-
 .../message/LeaseGrantedMessage.java               |  17 +-
 .../negotiation/LeaseNegotiator.java               |   4 +-
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |   1 +
 .../raft/jraft/disruptor/StripedDisruptor.java     |   2 +-
 .../ignite/internal/replicator/ReplicaManager.java |  42 +-
 .../ignite/internal/replicator/ReplicaService.java |  50 +-
 .../exception/ReplicaStoppingException.java}       |  31 +-
 .../replicator/PlacementDriverReplicaSideTest.java |   4 +-
 modules/runner/build.gradle                        |   5 +
 .../java/org/apache/ignite/internal/Cluster.java   |  26 +-
 .../internal/ClusterPerTestIntegrationTest.java    |  19 +-
 .../{ItComputeTest.java => ItComputeBaseTest.java} | 101 ++-
 .../compute/ItComputeTestBaseEmbedded.java         |  89 +++
 .../internal/compute/ItComputeTestStandalone.java  | 106 ++++
 .../internal/compute/ItLogicalTopologyTest.java    |  40 +-
 .../storage/ItRebalanceDistributedTest.java        |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../runner/app/client/ItThinClientComputeTest.java |  32 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |  11 +-
 .../internal/sql/engine/ItDataTypesTest.java       |   4 +-
 .../sql/engine/ItDynamicParameterTest.java         | 111 +---
 .../internal/sql/engine/ItFunctionsTest.java       |  34 +-
 .../varbinary/ItVarBinaryExpressionTest.java       |   8 +-
 .../org/apache/ignite/internal/ssl/ItSslTest.java  |   4 +-
 .../apache/ignite/internal/start/ItStartTest.java  |   2 +-
 .../units/apache-ignite-1.0-SNAPSHOT-src.zip       | Bin 0 -> 69608 bytes
 .../resources/units/ignite-jobs-1.0-SNAPSHOT.jar   | Bin 0 -> 4773 bytes
 .../org/apache/ignite/internal/app/IgniteImpl.java |  32 +-
 .../ignite/internal/sql/util/SqlTestUtils.java     | 158 +++++
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |   4 +-
 .../engine/exec/ddl/DdlCommandHandlerWrapper.java  |  34 +
 .../exec/ddl/DdlToCatalogCommandConverter.java     |  58 +-
 .../internal/sql/engine/exec/exp/RexImpTable.java  |   5 +-
 .../sql/engine/schema/CatalogSqlSchemaManager.java |  44 +-
 .../sql/engine/schema/IgniteSchemaIndex.java       |  22 +-
 .../sql/engine/schema/IgniteTableImpl.java         |  19 +-
 .../sql/engine/sql/fun/IgniteSqlOperatorTable.java |  20 +-
 .../sql/engine/sql/fun/SqlSubstringFunction.java   | 177 ++++++
 .../ignite/internal/sql/engine/util/Commons.java   |  28 +-
 .../internal/sql/engine/util/IgniteMethod.java     |   5 +-
 .../internal/sql/engine/StopCalciteModuleTest.java |   7 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   2 +-
 .../DdlCommandHandlerExceptionHandlingTest.java    |   4 +-
 .../engine/schema/CatalogSqlSchemaManagerTest.java |  73 +--
 .../internal/storage/engine/MvTableStorage.java    |  45 +-
 .../storage/engine/StorageTableDescriptor.java}    |  33 +-
 .../storage/index/BinaryTupleComparator.java       |  10 +-
 .../internal/storage/index/HashIndexStorage.java   |   2 +-
 .../internal/storage/index/SortedIndexStorage.java |   2 +-
 ...riptor.java => StorageHashIndexDescriptor.java} |  30 +-
 ...Descriptor.java => StorageIndexDescriptor.java} |  24 +-
 ...ptor.java => StorageSortedIndexDescriptor.java} |  37 +-
 .../internal/storage/util/MvPartitionStorages.java |  27 +-
 .../storage/index/BinaryTupleComparatorTest.java   |  38 +-
 .../storage/index/TestHashIndexStorageTest.java    |  10 +-
 .../storage/index/TestSortedIndexStorageTest.java  |  11 +-
 .../storage/util/MvPartitionStoragesTest.java      |  16 +-
 .../storage/AbstractMvTableStorageTest.java        |  47 +-
 .../internal/storage/BaseMvStoragesTest.java       |   4 +-
 .../internal/storage/impl/TestMvTableStorage.java  |  56 +-
 .../internal/storage/impl/TestStorageEngine.java   |   5 +-
 .../index/AbstractHashIndexStorageTest.java        |   6 +-
 .../storage/index/AbstractIndexStorageTest.java    |   9 +-
 .../index/AbstractSortedIndexStorageTest.java      |  10 +-
 .../index/impl/BinaryTupleRowSerializer.java       |   4 +-
 .../storage/index/impl/TestHashIndexStorage.java   |   8 +-
 .../internal/storage/index/impl/TestIndexRow.java  |   4 +-
 .../storage/index/impl/TestSortedIndexStorage.java |   8 +-
 .../pagememory/AbstractPageMemoryTableStorage.java |  54 +-
 .../PersistentPageMemoryTableStorage.java          | 132 ++--
 .../pagememory/VolatilePageMemoryTableStorage.java |  54 +-
 .../index/AbstractPageMemoryIndexStorage.java      | 127 +++-
 .../storage/pagememory/index/InlineUtils.java      |  14 +-
 .../pagememory/index/common/IndexRowKey.java}      |  20 +-
 .../pagememory/index/hash/HashIndexRowKey.java     |   9 +-
 .../pagememory/index/hash/HashIndexTree.java       |   4 +-
 .../index/hash/PageMemoryHashIndexStorage.java     |  55 +-
 .../index/sorted/PageMemorySortedIndexStorage.java | 192 +-----
 .../pagememory/index/sorted/SortedIndexRowKey.java |   9 +-
 .../pagememory/index/sorted/SortedIndexTree.java   |   4 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  58 +-
 .../mv/PersistentPageMemoryMvPartitionStorage.java |  24 +-
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |  48 +-
 .../AbstractPageMemoryHashIndexStorageTest.java    |   4 +-
 .../AbstractPageMemorySortedIndexStorageTest.java  |   4 +-
 .../storage/pagememory/index/InlineUtilsTest.java  |  22 +-
 .../PersistentPageMemoryHashIndexStorageTest.java  |   8 +-
 ...PersistentPageMemorySortedIndexStorageTest.java |   8 +-
 .../VolatilePageMemoryHashIndexStorageTest.java    |   8 +-
 .../VolatilePageMemorySortedIndexStorageTest.java  |   7 +-
 .../ignite/internal/storage/rocksdb/HashIndex.java |   6 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |   2 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |  88 ++-
 .../internal/storage/rocksdb/SortedIndex.java      |   6 +-
 .../rocksdb/index/AbstractRocksDbIndexStorage.java | 143 +++++
 .../index/RocksDbBinaryTupleComparator.java        |   6 +-
 .../rocksdb/index/RocksDbHashIndexStorage.java     |  55 +-
 .../rocksdb/index/RocksDbSortedIndexStorage.java   | 135 +---
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   2 +-
 .../storage/rocksdb/RocksDbStorageEngineTest.java  |   7 +-
 .../rocksdb/index/RocksDbHashIndexStorageTest.java |   7 +-
 .../index/RocksDbSortedIndexStorageTest.java       |   7 +-
 .../distributed/ItTxDistributedTestSingleNode.java |  10 +-
 .../ignite/distributed/ReplicaUnavailableTest.java | 102 ++-
 .../internal/table/KeyValueBinaryViewImpl.java     |  10 +
 .../ignite/internal/table/KeyValueViewImpl.java    |  10 +
 .../internal/table/RecordBinaryViewImpl.java       |   9 +
 .../ignite/internal/table/RecordViewImpl.java      |   9 +
 .../apache/ignite/internal/table/TableImpl.java    |   8 +-
 .../internal/table/distributed/TableManager.java   |   8 +-
 .../table/distributed/TableMessageGroup.java       |   6 +
 .../raft/snapshot/PartitionAccessImpl.java         |   8 +-
 .../replication/request/BinaryTupleMessage.java}   |  39 +-
 .../request/MultipleRowReplicaRequest.java         |  21 +-
 .../request/ReadOnlyReplicaRequest.java            |   8 +-
 .../request/ScanRetrieveBatchReplicaRequest.java   |  12 +-
 .../request/SingleRowReplicaRequest.java           |   9 +-
 .../replication/request/SwapRowReplicaRequest.java |  16 +-
 .../replicator/PartitionReplicaListener.java       |  63 +-
 .../distributed/schema/ColumnDefinitionDiff.java   |   8 +-
 .../table/distributed/schema/FullTableSchema.java  |  38 +-
 .../distributed/schema/NonHistoricSchemas.java     |  10 +-
 .../distributed/schema/TableDefinitionDiff.java    |  28 +-
 .../distributed/storage/InternalTableImpl.java     |  96 +--
 .../org/apache/ignite/internal/table/Example.java  | 695 ---------------------
 .../internal/table/distributed/IndexBaseTest.java  |  20 +-
 .../raft/snapshot/PartitionAccessImplTest.java     |  27 +-
 .../incoming/IncomingSnapshotCopierTest.java       |  40 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |  23 +-
 .../replication/PartitionReplicaListenerTest.java  | 139 +++--
 .../distributed/schema/FullTableSchemaTest.java    |  34 +-
 .../table/impl/DummyInternalTableImpl.java         |   6 +-
 .../ignite/internal/table/impl/DummySchemas.java   |   4 +-
 261 files changed, 6863 insertions(+), 3706 deletions(-)


[ignite-3] 02/02: fix

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17765-2
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 05cf5031dc0271e417031bb9dba4e64a29b71a14
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Jun 13 19:06:06 2023 +0300

    fix
---
 .../internal/sql/engine/SqlQueryProcessor.java     | 323 +++++++++++----------
 .../internal/sql/engine/prepare/CacheKey.java      |  19 +-
 .../sql/engine/schema/CatalogSqlSchemaManager.java |   5 +
 .../sql/engine/schema/SqlSchemaManager.java        |   5 +
 4 files changed, 197 insertions(+), 155 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index ffa661fe87..f3dd71b238 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML;
 import static org.apache.ignite.internal.sql.engine.SqlQueryType.QUERY;
 import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
@@ -33,13 +34,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
@@ -53,6 +52,7 @@ import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
@@ -90,7 +90,6 @@ import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
 import org.apache.ignite.internal.sql.engine.sql.ParseResult;
 import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext.Builder;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.DataStorageManager;
@@ -110,7 +109,7 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *  SqlQueryProcessor.
+ * SqlQueryProcessor.
  *  TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class SqlQueryProcessor implements QueryProcessor {
@@ -126,8 +125,8 @@ public class SqlQueryProcessor implements QueryProcessor {
     private static final long SESSION_EXPIRE_CHECK_PERIOD = TimeUnit.SECONDS.toMillis(1);
 
     /**
-     * Duration in milliseconds after which the session will be considered expired if no action have been performed
-     * on behalf of this session during this period.
+     * Duration in milliseconds after which the session will be considered expired if no action have been performed on behalf of this
+     * session during this period.
      */
     private static final long DEFAULT_SESSION_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(15);
 
@@ -390,22 +389,123 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
             SessionId sessionId,
-            QueryContext context,
+            QueryContext queryContext,
             String sql,
             Object... params
     ) {
         Session session = sessionManager.session(sessionId);
 
         if (session == null) {
-            return CompletableFuture.failedFuture(
+            return failedFuture(
                     new SqlException(SESSION_NOT_FOUND_ERR, format("Session not found [{}]", sessionId)));
         }
 
+        QueryCancel queryCancel;
+
+        try {
+            queryCancel = createQueryCancel(session);
+        } catch (IllegalStateException ex) {
+            return failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR,
+                    format("Session has been expired [{}]", session.sessionId()), ex));
+        }
+
         String schemaName = session.properties().get(QueryProperty.DEFAULT_SCHEMA);
 
-        InternalTransaction outerTx = context.unwrap(InternalTransaction.class);
+        InternalTransaction outerTx = queryContext.unwrap(InternalTransaction.class);
+        long timestamp = outerTx == null ? clock.nowLong() : outerTx.startTimestamp().longValue();
+
+        boolean implicitTx = outerTx == null;
+
+        int plannerCatalogVersion = sqlSchemaManager.actualCatalogVersion(timestamp);
+        SchemaPlus schema = sqlSchemaManager.schema(schemaName, plannerCatalogVersion);
+
+        if (schema == null) {
+            return failedFuture(new SchemaNotFoundException(schemaName));
+        }
+
+        CacheKey cacheKey = new CacheKey(schemaName, plannerCatalogVersion, sql, params);
+
+        BaseQueryContext plannerContext = BaseQueryContext.builder()
+                .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+                .logger(LOG)
+                .cancel(queryCancel)
+                .parameters(params)
+                .plannerTimeout(PLANNER_TIMEOUT)
+                .build();
+
+        CompletableFuture<QueryPlan> planFuture = queryCache.get(cacheKey);
+
+        if (planFuture == null) {
+            planFuture = CompletableFuture.supplyAsync(() -> {
+                // Parse query
+                StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE);
+                SqlNode sqlNode = parseResult.statement();
 
-        QueryCancel queryCancel = new QueryCancel();
+                // Validate statement
+                validateParsedStatement(queryContext, parseResult, sqlNode, params);
+
+                return sqlNode;
+            }).thenCompose(sqlNode -> {
+                if (skipCache(Commons.getQueryType(sqlNode))) {
+                    // Prepare query plan without caching.
+                    return prepareSvc.prepareAsync(sqlNode, plannerContext);
+                }
+
+                // Try query plan for normalized query, or create a new one asynchronously.
+                CacheKey normalizedQueryCacheKey = new CacheKey(schemaName, plannerCatalogVersion, sqlNode.toString(), params);
+
+                CompletableFuture<QueryPlan> planFuture0 = queryCache.computeIfAbsent(
+                        normalizedQueryCacheKey,
+                        k -> prepareSvc.prepareAsync(sqlNode, plannerContext)
+                );
+
+                queryCache.putIfAbsent(cacheKey, planFuture0);
+
+                // Copy shared plan.
+                return planFuture0.thenApply(QueryPlan::copy);
+            });
+        } else {
+            // Copy shared plan.
+            planFuture = planFuture.thenApply(QueryPlan::copy);
+        }
+
+        return planFuture
+                .thenCompose(plan -> {
+                    // Validate plan
+                    if (SqlQueryType.DDL == plan.type() && outerTx != null) {
+                        return failedFuture(new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support transactions."));
+                    }
+
+                    InternalTransaction tx = implicitTx ? txManager.begin(plan.type() != DML) : outerTx;
+
+                    try {
+                        int txCatalogVersion = catalogManager.activeCatalogVersion(tx.startTimestamp().longValue());
+
+                        if (implicitTx && plannerCatalogVersion != txCatalogVersion) {
+                            LOG.info("Retry query planning: plannerCatalogVersion={}, txCatalogVersion={}",
+                                    plannerCatalogVersion, txCatalogVersion);
+
+                            return tx.rollbackAsync()
+                                    .thenComposeAsync(ignore -> querySingle0(sessionId, queryContext, sql, params), taskExecutor);
+                        }
+
+                        return CompletableFuture.completedFuture(executePlan(session, tx, plan, plannerContext));
+                    } catch (Throwable th) {
+                        handleQueryException(th, queryCancel, implicitTx ? tx : null);
+
+                        return failedFuture(th);
+                    }
+                });
+    }
+
+    private static boolean skipCache(@Nullable SqlQueryType queryType) {
+        return queryType != QUERY && queryType != DML;
+    }
+
+    @NotNull
+    private QueryCancel createQueryCancel(Session session) {
+        QueryCancel queryCancel;
+        queryCancel = new QueryCancel();
 
         AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
                 queryCancel::cancel,
@@ -413,114 +513,52 @@ public class SqlQueryProcessor implements QueryProcessor {
         );
 
         queryCancel.add(() -> session.unregisterResource(closeableResource));
+        session.registerResource(closeableResource);
+        return queryCancel;
+    }
 
-        try {
-            session.registerResource(closeableResource);
-        } catch (IllegalStateException ex) {
-            return CompletableFuture.failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR,
-                    format("Session has been expired [{}]", session.sessionId()), ex));
-        }
 
-        CompletableFuture<Void> start = new CompletableFuture<>();
-
-        boolean implicitTxRequired = outerTx == null;
-        AtomicReference<InternalTransaction> tx = new AtomicReference<>();
-
-        CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
-                .thenCompose(v -> {
-                    Builder contextBuilder = BaseQueryContext.builder()
-                            .logger(LOG)
-                            .cancel(queryCancel)
-                            .parameters(params)
-                            .plannerTimeout(PLANNER_TIMEOUT);
-
-                    CompletableFuture<PlanWithContext>[] newPlanHolder = new CompletableFuture[1];
-
-                    CompletableFuture<QueryPlan> cachedPlan = queryCache.computeIfAbsent(new CacheKey(schemaName, sql, params), (k) -> {
-                        // Parse query.
-                        StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE);
-                        SqlNode sqlNode = parseResult.statement();
-
-                        validateParsedStatement(context, outerTx, parseResult, sqlNode, params);
-
-                        // Build context.
-                        tx.set(implicitTxRequired ? txManager.begin(!dataModificationOp(sqlNode)) : outerTx);
-                        SchemaPlus schema = resolveSchema(schemaName);
-                        BaseQueryContext ctx = contextBuilder.frameworkConfig(
-                                Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()).build();
-
-                        CompletableFuture<QueryPlan> planFuture = prepareSvc.prepareAsync(sqlNode, ctx);
-
-                        SqlQueryType queryType = Commons.getQueryType(sqlNode);
-                        boolean putPlanIntoCache = queryType == QUERY || queryType == DML;
-
-                        newPlanHolder[0] = planFuture.thenApply(plan -> new PlanWithContext(putPlanIntoCache ? plan.copy() : plan, ctx));
-
-                        return putPlanIntoCache ? planFuture : null;
-                    });
-
-                    return Objects.requireNonNullElseGet(
-                            newPlanHolder[0],
-                            () -> cachedPlan.thenApply(plan -> {
-                                // Build query context for execution.
-                                tx.set(implicitTxRequired ? txManager.begin(plan.type() != DML) : outerTx);
-                                SchemaPlus schema = resolveSchema(schemaName);
-                                BaseQueryContext ctx = contextBuilder.frameworkConfig(
-                                        Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()).build();
-
-                                return new PlanWithContext(plan.copy(), ctx);
-                            })
-                    ).thenApply(planWithContext -> {
-                        QueryPlan plan = planWithContext.plan;
-                        BaseQueryContext ctx = planWithContext.context;
-
-                        var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx);
-
-                        SqlQueryType queryType = plan.type();
-                        assert queryType != null : "Expected a full plan but got a fragment: " + plan;
-
-                        return new AsyncSqlCursorImpl<>(
-                                queryType,
-                                plan.metadata(),
-                                implicitTxRequired ? tx.get() : null,
-                                new AsyncCursor<List<Object>>() {
-                                    @Override
-                                    public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
-                                        session.touch();
-
-                                        return dataCursor.requestNextAsync(rows);
-                                    }
-
-                                    @Override
-                                    public CompletableFuture<Void> closeAsync() {
-                                        session.touch();
-
-                                        return dataCursor.closeAsync();
-                                    }
-                                }
-                        );
-                    });
-                });
+    private AsyncSqlCursor<List<Object>> executePlan(Session session, InternalTransaction tx, QueryPlan plan, BaseQueryContext ctx) {
+        var dataCursor = executionSrvc.executePlan(tx, plan, ctx);
 
-        stage.whenComplete((cur, ex) -> {
-            if (ex instanceof CancellationException) {
-                queryCancel.cancel();
-            }
+        SqlQueryType queryType = plan.type();
+        assert queryType != null : "Expected a full plan but got a fragment: " + plan;
+
+        return new AsyncSqlCursorImpl<>(
+                queryType,
+                plan.metadata(),
+                tx,
+                new AsyncCursor<>() {
+                    @Override
+                    public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+                        session.touch();
+
+                        return dataCursor.requestNextAsync(rows);
+                    }
+
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        session.touch();
 
-            if (ex != null && outerTx == null) {
-                InternalTransaction tx0 = tx.get();
-                if (tx0 != null) {
-                    tx0.rollback();
+                        return dataCursor.closeAsync();
+                    }
                 }
-            }
-        });
+        );
+    }
 
-        start.completeAsync(() -> null, taskExecutor);
+    private static void handleQueryException(Throwable th, QueryCancel queryCancel, @Nullable InternalTransaction tx) {
+        if (th instanceof CancellationException) {
+            queryCancel.cancel();
+        }
 
-        return stage;
+        if (th != null) {
+            if (tx != null) {
+                tx.rollback();
+            }
+        }
     }
 
-    private SchemaPlus resolveSchema(String schemaName) {
+    private SchemaPlus resolveSchema(String schemaName, HybridTimestamp timestamp) {
         SchemaPlus schema = sqlSchemaManager.schema(schemaName);
 
         if (schema == null) {
@@ -555,11 +593,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableCreated(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -573,11 +611,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableUpdated(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -591,11 +629,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableDropped(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -609,12 +647,12 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onIndexDropped(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.indexId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.indexId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -628,11 +666,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onIndexCreated(
-                    parameters.tableId(),
-                    parameters.indexId(),
-                    parameters.indexDescriptor(),
-                    parameters.causalityToken()
-            )
+                            parameters.tableId(),
+                            parameters.indexId(),
+                            parameters.indexDescriptor(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -645,7 +683,6 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Performs additional validation of a parsed statement. **/
     private static void validateParsedStatement(
             QueryContext context,
-            InternalTransaction outerTx,
             ParseResult parseResult,
             SqlNode node,
             Object[] params
@@ -665,10 +702,6 @@ public class SqlQueryProcessor implements QueryProcessor {
             throw new QueryValidationException(message);
         }
 
-        if (SqlQueryType.DDL == queryType && outerTx != null) {
-            throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support transactions.");
-        }
-
         if (parseResult.dynamicParamsCount() != params.length) {
             String message = format(
                     "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).",
@@ -687,14 +720,4 @@ public class SqlQueryProcessor implements QueryProcessor {
             }
         }
     }
-
-    private static final class PlanWithContext {
-        private final QueryPlan plan;
-        private final BaseQueryContext context;
-
-        private PlanWithContext(QueryPlan plan, BaseQueryContext context) {
-            this.plan = plan;
-            this.context = context;
-        }
-    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
index b6a1c023aa..dbe0f0f957 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
@@ -29,19 +29,25 @@ public class CacheKey {
 
     private final String schemaName;
 
+    private final int catalogVersion;
+
     private final String query;
 
     private final Class[] paramTypes;
 
+
     /**
      * Constructor.
      *
      * @param schemaName Schema name.
-     * @param query      Query string.
-     * @param params     Dynamic parameters.
+     * @param catalogVersion Catalog version.
+     * @param query Query string.
+     * @param params Dynamic parameters.
      */
-    public CacheKey(String schemaName, String query, Object[] params) {
+    public CacheKey(String schemaName, int catalogVersion, String query, Object[] params) {
+        //TODO: IGNITE-17765 use schema id instead of name.
         this.schemaName = schemaName;
+        this.catalogVersion = catalogVersion;
         this.query = query;
         this.paramTypes = params.length == 0
                 ? EMPTY_CLASS_ARRAY
@@ -60,6 +66,9 @@ public class CacheKey {
 
         CacheKey cacheKey = (CacheKey) o;
 
+        if (catalogVersion != cacheKey.catalogVersion){
+            return false;
+        }
         if (!schemaName.equals(cacheKey.schemaName)) {
             return false;
         }
@@ -67,14 +76,14 @@ public class CacheKey {
             return false;
         }
 
-        return Arrays.deepEquals(paramTypes, cacheKey.paramTypes);
+        return Arrays.equals(paramTypes, cacheKey.paramTypes);
     }
 
     @Override
     public int hashCode() {
         int result = schemaName.hashCode();
         result = 31 * result + query.hashCode();
-        result = 31 * result + Arrays.deepHashCode(paramTypes);
+        result = 31 * result + Arrays.hashCode(paramTypes);
         return result;
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
index d18bece8f9..9975062ef9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
@@ -107,6 +107,11 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager {
         return cache.computeIfAbsent(entry, (v) -> createSqlSchema(v.getValue(), descriptor));
     }
 
+    @Override
+    public int actualCatalogVersion(long timestamp) {
+        return catalogManager.activeCatalogVersion(timestamp);
+    }
+
     private SchemaPlus createSqlSchema(int version, CatalogSchemaDescriptor descriptor) {
         String schemaName = descriptor.name();
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
index ef45bc8598..3031aa7bf6 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
@@ -53,4 +53,9 @@ public interface SqlSchemaManager {
      * Returns a required schema if specified, or default schema otherwise.
      */
     SchemaPlus activeSchema(@Nullable String name, long timestamp);
+
+    /**
+     * Return actual schema version.
+     */
+    int actualCatalogVersion(long timestamp);
 }