You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/10/27 17:19:50 UTC

[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17330 Support read only transactions from sql side - Fixes #1232.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
     new 7b8b271d49 IGNITE-17330 Support read only transactions from sql side - Fixes #1232.
7b8b271d49 is described below

commit 7b8b271d49e76d37ea29de0d20120728fa2040d0
Author: zstan <st...@gmail.com>
AuthorDate: Thu Oct 27 20:12:56 2022 +0300

    IGNITE-17330 Support read only transactions from sql side - Fixes #1232.
    
    Signed-off-by: zstan <st...@gmail.com>
    (cherry picked from commit d1405efac259beee360f04cdfed06d629d7780d4)
---
 .../app/jdbc/ItJdbcInsertStatementSelfTest.java    |  2 +-
 .../runner/app/jdbc/ItJdbcStatementSelfTest.java   |  2 +-
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 57 ++++++++++++++++++++++
 .../internal/sql/api/AsyncResultSetImpl.java       |  6 ---
 .../internal/sql/engine/SqlQueryProcessor.java     | 31 +++++++++---
 .../sql/engine/exec/ExchangeServiceImpl.java       |  9 ++--
 .../internal/sql/engine/exec/ExecutionContext.java | 19 +++++---
 .../sql/engine/exec/ExecutionServiceImpl.java      | 27 +++++-----
 .../sql/engine/exec/LogicalRelImplementor.java     |  8 +--
 .../sql/engine/exec/rel/TableScanNode.java         |  8 ++-
 .../sql/engine/message/QueryStartRequest.java      |  8 +++
 .../sql/engine/metadata/FragmentDescription.java   |  2 +-
 .../sql/engine/prepare/PrepareServiceImpl.java     |  5 +-
 .../internal/sql/engine/trait/TraitUtils.java      | 16 ++++++
 .../internal/sql/engine/util/BaseQueryContext.java | 21 +++++++-
 .../internal/sql/engine/StopCalciteModuleTest.java |  2 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  6 ++-
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |  4 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |  9 +++-
 19 files changed, 191 insertions(+), 51 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java
index dbb332b917..db195269f2 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java
@@ -206,7 +206,7 @@ public class ItJdbcInsertStatementSelfTest extends ItJdbcAbstractStatementSelfTe
 
         assertTrue(resultSet.next());
 
-        assertEquals(3, resultSet.getInt(1));
+        assertEquals(1, resultSet.getInt(1));
 
         doCheck();
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java
index fb70a97aeb..1c75419cb5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java
@@ -119,7 +119,7 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
 
             int val = rs.getInt(1);
 
-            assertTrue(val >= 1 && val <= 10, "Invalid val: " + val);
+            assertEquals(5, val);
         }
 
         stmt.close();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index e6e5de129a..6b651caef0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -76,6 +76,7 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -321,6 +322,16 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
 
         rs.closeAsync();
 
+        outerTx = igniteTx().begin();
+
+        rs = ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0").get();
+
+        assertEquals(2 * ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+
+        rs.closeAsync();
+
+        outerTx.commit();
+
         checkDml(2 * ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
 
         checkDml(2 * ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
@@ -332,6 +343,51 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         assertEquals(txManagerInternal.finished(), states.size());
     }
 
+    /** Check correctness of rw and ro transactions. */
+    @Test
+    public void checkMixedTransactions() throws Exception {
+        IgniteSql sql = igniteSql();
+
+        if (sql instanceof ClientSql) {
+            return;
+        }
+
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        Session ses = sql.createSession();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        checkTx(ses, true, false, true);
+        checkTx(ses, true, false, false);
+        checkTx(ses, true, true, true);
+        checkTx(ses, true, true, false);
+        checkTx(ses, false, true, true);
+        checkTx(ses, false, true, false);
+        checkTx(ses, false, false, true);
+        checkTx(ses, false, false, false);
+    }
+
+    private void checkTx(Session ses, boolean readOnly, boolean commit, boolean explicit) throws Exception {
+        Transaction outerTx = explicit ? (readOnly ? igniteTx().readOnly().begin() : igniteTx().begin()) : null;
+
+        AsyncResultSet rs = ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0").get();
+
+        assertEquals(ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+
+        rs.closeAsync();
+
+        if (outerTx != null) {
+            if (commit) {
+                outerTx.commit();
+            } else {
+                outerTx.rollback();
+            }
+        }
+    }
+
     @Test
     public void select() throws ExecutionException, InterruptedException {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
@@ -521,6 +577,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         }
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17998")
     @Test
     public void closeSession() throws ExecutionException, InterruptedException {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 39b64a5694..a1d46be888 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -67,12 +67,6 @@ public class AsyncResultSetImpl implements AsyncResultSet {
         this.curPage = page;
         this.pageSize = pageSize;
         this.closeRun = closeRun;
-
-        assert cur.queryType() == SqlQueryType.QUERY
-                || ((cur.queryType() == SqlQueryType.DML || cur.queryType() == SqlQueryType.DDL)
-                && curPage.items().size() == 1
-                && curPage.items().get(0).size() == 1
-                && !curPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + curPage + ']';
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 98cf2e9af4..3f86aae8af 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -38,10 +38,13 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
@@ -61,7 +64,6 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
@@ -185,7 +187,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         ));
 
         var exchangeService = registerService(new ExchangeServiceImpl(
-                nodeName,
+                clusterSrvc.topologyService().localMember(),
                 taskExecutor,
                 mailboxRegistry,
                 msgSrvc
@@ -387,17 +389,22 @@ public class SqlQueryProcessor implements QueryProcessor {
                     return nodes.get(0);
                 })
                 .thenCompose(sqlNode -> {
+                    final boolean rwOp = dataModificationOp(sqlNode);
+                    final HybridTimestamp txTime = outerTx != null ? outerTx.readTimestamp() : rwOp ? null : new HybridClock().now();
+
                     BaseQueryContext ctx = BaseQueryContext.builder()
                             .frameworkConfig(
                                     Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
                                             .defaultSchema(schema)
-                                            .traitDefs(Commons.LOCAL_TRAITS_SET)
+                                            .traitDefs(rwOp || (outerTx != null && !outerTx.isReadOnly()) ? Commons.LOCAL_TRAITS_SET :
+                                                    Commons.DISTRIBUTED_TRAITS_SET)
                                             .build()
                             )
                             .logger(LOG)
                             .cancel(queryCancel)
                             .parameters(params)
                             .transaction(outerTx)
+                            .transactionTime(txTime)
                             .plannerTimeout(PLANNER_TIMEOUT)
                             .build();
 
@@ -406,9 +413,8 @@ public class SqlQueryProcessor implements QueryProcessor {
                                 context.maybeUnwrap(QueryValidator.class)
                                         .ifPresent(queryValidator -> queryValidator.validatePlan(plan));
 
-                                // Transactional DDL is not supported as well as RO transactions, hence
-                                // only DML requiring RW transaction is covered
-                                boolean implicitTxRequired = (plan.type() == Type.DML || plan.type() == Type.QUERY) && outerTx == null;
+                                boolean implicitTxRequired = outerTx == null && rwOp;
+
                                 InternalTransaction implicitTx = implicitTxRequired ? txManager.begin() : null;
 
                                 BaseQueryContext enrichedContext =
@@ -469,16 +475,22 @@ public class SqlQueryProcessor implements QueryProcessor {
         CompletableFuture<Void> start = new CompletableFuture<>();
 
         for (SqlNode sqlNode : nodes) {
+            boolean needStartTx = SqlKind.DML.contains(sqlNode.getKind()) || SqlKind.QUERY.contains(sqlNode.getKind());
+            // Only rw transactions for now.
+            InternalTransaction implicitTx = needStartTx ? txManager.begin() : null;
+
             final BaseQueryContext ctx = BaseQueryContext.builder()
                     .cancel(new QueryCancel())
                     .frameworkConfig(
                             Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                    .traitDefs(needStartTx ? Commons.LOCAL_TRAITS_SET : Commons.DISTRIBUTED_TRAITS_SET)
                                     .defaultSchema(schema)
                                     .build()
                     )
                     .logger(LOG)
                     .parameters(params)
                     .plannerTimeout(PLANNER_TIMEOUT)
+                    .transaction(implicitTx)
                     .build();
 
             // TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix query execution flow.
@@ -490,7 +502,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                         return new AsyncSqlCursorImpl<>(
                                 SqlQueryType.mapPlanTypeToSqlType(plan.type()),
                                 plan.metadata(),
-                                null,
+                                implicitTx,
                                 executionSrvc.executePlan(plan, ctx)
                         );
                     });
@@ -620,4 +632,9 @@ public class SqlQueryProcessor implements QueryProcessor {
                     .thenApply(v -> false);
         }
     }
+
+    /** Returns {@code true} if this is data modification operation. */
+    private static boolean dataModificationOp(SqlNode sqlNode) {
+        return SqlKind.DML.contains(sqlNode.getKind());
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 1ec7f53dc0..73d94a4ef0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * ExchangeServiceImpl. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -49,7 +50,7 @@ public class ExchangeServiceImpl implements ExchangeService {
 
     private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
 
-    private final String localNodeId;
+    private final ClusterNode localNode;
 
     private final QueryTaskExecutor taskExecutor;
 
@@ -61,12 +62,12 @@ public class ExchangeServiceImpl implements ExchangeService {
      * Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public ExchangeServiceImpl(
-            String localNodeId,
+            ClusterNode localNode,
             QueryTaskExecutor taskExecutor,
             MailboxRegistry mailboxRegistry,
             MessageService msgSrvc
     ) {
-        this.localNodeId = localNodeId;
+        this.localNode = localNode;
         this.taskExecutor = taskExecutor;
         this.mailboxRegistry = mailboxRegistry;
         this.msgSrvc = msgSrvc;
@@ -221,7 +222,7 @@ public class ExchangeServiceImpl implements ExchangeService {
                         .build(),
                 taskExecutor,
                 qryId,
-                localNodeId,
+                localNode,
                 nodeId,
                 new FragmentDescription(
                         fragmentId,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 14f802ef1a..a544a393c4 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -32,6 +32,7 @@ import java.util.function.Consumer;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -74,7 +76,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
 
     private final Map<String, Object> params;
 
-    private final String locNodeId;
+    private final ClusterNode localNode;
 
     private final String originatingNodeId;
 
@@ -111,7 +113,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
             BaseQueryContext qctx,
             QueryTaskExecutor executor,
             UUID qryId,
-            String locNodeId,
+            ClusterNode localNode,
             String originatingNodeId,
             FragmentDescription fragmentDesc,
             RowHandler<RowT> handler,
@@ -126,7 +128,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
         this.fragmentDesc = fragmentDesc;
         this.handler = handler;
         this.params = params;
-        this.locNodeId = locNodeId;
+        this.localNode = localNode;
         this.originatingNodeId = originatingNodeId;
         this.tx = tx;
 
@@ -219,10 +221,10 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
     }
 
     /**
-     * Get local node ID.
+     * Get local node.
      */
-    public String localNodeId() {
-        return locNodeId;
+    public ClusterNode localNode() {
+        return localNode;
     }
 
     /** {@inheritDoc} */
@@ -348,6 +350,11 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
         return tx;
     }
 
+    /** Read only transaction time. */
+    public HybridTimestamp transactionTime() {
+        return qctx.transactionTime();
+    }
+
     /**
      * Sets cancel flag, returns {@code true} if flag was changed by this call.
      *
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ab71eab935..e1cd92462c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.configuration.ConfigurationChangeException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -91,7 +92,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
     private final MessageService msgSrvc;
 
-    private final String locNodeId;
+    private final ClusterNode localNode;
 
     private final SqlSchemaManager sqlSchemaManager;
 
@@ -138,7 +139,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
             DataStorageManager dataStorageManager
     ) {
         return new ExecutionServiceImpl<>(
-                topSrvc.localMember().id(),
+                topSrvc.localMember(),
                 msgSrvc,
                 new MappingServiceImpl(topSrvc),
                 sqlSchemaManager,
@@ -154,7 +155,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
      * Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     ExecutionServiceImpl(
-            String localNodeId,
+            ClusterNode localNode,
             MessageService msgSrvc,
             MappingService mappingSrvc,
             SqlSchemaManager sqlSchemaManager,
@@ -164,7 +165,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
             ExchangeService exchangeSrvc,
             ImplementorFactory<RowT> implementorFactory
     ) {
-        this.locNodeId = localNodeId;
+        this.localNode = localNode;
         this.handler = handler;
         this.msgSrvc = msgSrvc;
         this.mappingSrvc = mappingSrvc;
@@ -201,7 +202,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         return queryManager.execute(plan);
     }
 
-    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params) {
+    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params, HybridTimestamp txTime) {
         return BaseQueryContext.builder()
                 .queryId(queryId)
                 .parameters(params)
@@ -211,6 +212,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
                                 .build()
                 )
                 .logger(LOG)
+                .transactionTime(txTime)
                 .build();
     }
 
@@ -291,7 +293,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         assert nodeId != null && msg != null;
 
         DistributedQueryManager queryManager = queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
-            BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters());
+            BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters(), msg.txTime());
 
             return new DistributedQueryManager(ctx);
         });
@@ -382,15 +384,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
         private volatile Long rootFragmentId = null;
 
-        private @Nullable InternalTransaction transaction;
+        private @Nullable InternalTransaction tx;
 
         private DistributedQueryManager(
                 BaseQueryContext ctx,
-                @Nullable InternalTransaction transaction
+                @Nullable InternalTransaction tx
         ) {
             this(ctx);
 
-            this.transaction = transaction;
+            this.tx = tx;
         }
 
         private DistributedQueryManager(BaseQueryContext ctx) {
@@ -419,6 +421,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
                     .root(fragment.serialized())
                     .fragmentDescription(desc)
                     .parameters(ctx.parameters())
+                    .txTime(ctx.transactionTime())
                     .build();
 
             var fut = new CompletableFuture<Void>();
@@ -520,12 +523,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
                     ctx,
                     taskExecutor,
                     ctx.queryId(),
-                    locNodeId,
+                    localNode,
                     initiatorNodeId,
                     desc,
                     handler,
                     Commons.parametersMap(ctx.parameters()),
-                    transaction
+                    tx
             );
         }
 
@@ -558,7 +561,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
         private AsyncCursor<List<Object>> execute(MultiStepPlan plan) {
             taskExecutor.execute(() -> {
-                plan.init(mappingSrvc, new MappingQueryContext(locNodeId));
+                plan.init(mappingSrvc, new MappingQueryContext(localNode.id()));
 
                 List<Fragment> fragments = plan.fragments();
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index dc9208d9a7..5d31e63046 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -188,7 +188,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
 
         IgniteDistribution distr = rel.distribution();
         Destination<RowT> dest = distr.destination(ctx, affSrvc, ctx.group(rel.sourceId()));
-        String localNodeId = ctx.localNodeId();
+        String localNodeId = ctx.localNode().id();
 
         FilterNode<RowT> node = new FilterNode<>(ctx, rel.getRowType(), r -> Objects.equals(localNodeId, first(dest.targets(r))));
 
@@ -303,7 +303,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
 
         IgniteIndex idx = tbl.getIndex(rel.indexName());
         ColocationGroup group = ctx.group(rel.sourceId());
-        int[] parts = group.partitions(ctx.localNodeId());
+        int[] parts = group.partitions(ctx.localNode().id());
 
         return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, ranges, filters, prj, requiredColumns.toBitSet());
     }
@@ -328,7 +328,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
 
         ColocationGroup group = ctx.group(rel.sourceId());
 
-        if (!group.nodeIds().contains(ctx.localNodeId())) {
+        if (!group.nodeIds().contains(ctx.localNode().id())) {
             return new ScanNode<>(ctx, rowType, Collections.emptyList());
         }
 
@@ -336,7 +336,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
                 ctx,
                 rowType,
                 tbl,
-                group.partitions(ctx.localNodeId()),
+                group.partitions(ctx.localNode().id()),
                 filters,
                 prj,
                 requiredColumns
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 8dcd4f3fb4..d902e71c7c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -95,6 +95,8 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
 
         assert !nullOrEmpty(parts);
 
+        assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized.";
+
         this.physTable = schemaTable.table();
         this.schemaTable = schemaTable;
         this.parts = parts;
@@ -218,7 +220,11 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
         if (subscription != null) {
             subscription.request(waiting);
         } else if (curPartIdx < parts.length) {
-            physTable.scan(parts[curPartIdx++], context().transaction()).subscribe(new SubscriberImpl());
+            if (context().transactionTime() != null) {
+                physTable.scan(parts[curPartIdx++], context().transactionTime(), context().localNode()).subscribe(new SubscriberImpl());
+            } else {
+                physTable.scan(parts[curPartIdx++], context().transaction()).subscribe(new SubscriberImpl());
+            }
         } else {
             waiting = NOT_WAITING;
         }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index 85be0a59d2..73d85c97a7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * QueryStartRequest interface.
@@ -47,4 +49,10 @@ public interface QueryStartRequest extends ExecutionContextAwareMessage {
      */
     @Marshallable
     Object[] parameters();
+
+    /**
+     * Read only transaction time or null if this is read write transaction.
+     */
+    @Marshallable
+    @Nullable HybridTimestamp txTime();
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
index 674ffa29a0..abd28b6965 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
@@ -82,7 +82,7 @@ public class FragmentDescription implements Serializable {
     }
 
     /**
-     * Get mappring.
+     * Get mapping.
      */
     public FragmentMapping mapping() {
         return mapping;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 70a956d561..7f79156fae 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.prepare;
 
 import static org.apache.ignite.internal.sql.engine.prepare.PlannerHelper.optimize;
+import static org.apache.ignite.internal.sql.engine.trait.TraitUtils.distributionPresent;
 import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.USUPPORTED_SQL_OPERATION_KIND_ERR;
 
@@ -215,7 +216,9 @@ public class PrepareServiceImpl implements PrepareService, SchemaUpdateListener
     }
 
     private CompletableFuture<QueryPlan> prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
-        var key = new CacheKey(ctx.schemaName(), sqlNode.toString());
+        boolean distributed = distributionPresent(ctx.config().getTraitDefs());
+
+        var key = new CacheKey(ctx.schemaName(), sqlNode.toString(), distributed);
 
         var planFut = cache.computeIfAbsent(key, k -> CompletableFuture.supplyAsync(() -> {
             IgnitePlanner planner = ctx.planner();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 7311a3f413..7a0f3c43ab 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
+import com.google.common.collect.ImmutableList;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
@@ -262,6 +263,21 @@ public class TraitUtils {
         return traits.getTrait(DistributionTraitDef.INSTANCE);
     }
 
+    /**
+     * Check distribution definition in traits.
+     *
+     * @param traitDefs Traits to analyze.
+     * @return {@code true} if distribution found, {@code false} otherwise.
+     */
+    public static boolean distributionPresent(ImmutableList<RelTraitDef> traitDefs) {
+        for (RelTraitDef<?> trait : traitDefs) {
+            if (trait instanceof DistributionTraitDef) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 2cd28e9225..2327fa28e2 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -49,6 +49,7 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
@@ -161,6 +162,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
     private final InternalTransaction tx;
 
+    private final HybridTimestamp txTs;
+
     private CalciteCatalogReader catalogReader;
 
     private long plannerTimeout;
@@ -175,6 +178,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
             Object[] parameters,
             IgniteLogger log,
             InternalTransaction tx,
+            HybridTimestamp txTs,
             long plannerTimeout
     ) {
         super(Contexts.chain(cfg.getContext()));
@@ -187,6 +191,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
         this.cancel = cancel;
         this.parameters = parameters;
         this.tx = tx;
+        this.txTs = txTs;
         this.plannerTimeout = plannerTimeout;
 
         RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem());
@@ -240,6 +245,10 @@ public final class BaseQueryContext extends AbstractQueryContext {
         return tx;
     }
 
+    public HybridTimestamp transactionTime() {
+        return txTs;
+    }
+
     public long plannerTimeout() {
         return plannerTimeout;
     }
@@ -281,7 +290,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
                 .logger(log)
                 .cancel(cancel)
                 .parameters(parameters)
-                .transaction(tx);
+                .transaction(tx)
+                .transactionTime(txTs);
     }
 
     /**
@@ -313,6 +323,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
         private InternalTransaction tx;
 
+        private HybridTimestamp txTs;
+
         private long plannerTimeout;
 
         public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
@@ -345,13 +357,18 @@ public final class BaseQueryContext extends AbstractQueryContext {
             return this;
         }
 
+        public Builder transactionTime(HybridTimestamp txTs) {
+            this.txTs = txTs;
+            return this;
+        }
+
         public Builder plannerTimeout(long plannerTimeout) {
             this.plannerTimeout = plannerTimeout;
             return this;
         }
 
         public BaseQueryContext build() {
-            return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, tx, plannerTimeout);
+            return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, tx, txTs, plannerTimeout);
         }
     }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index e2d560e357..f3628ad01e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -219,6 +220,7 @@ public class StopCalciteModuleTest {
 
         when(tbl.tableId()).thenReturn(UUID.randomUUID());
 
+        when(txManager.begin()).thenReturn(mock(InternalTransaction.class));
         when(tbl.storage()).thenReturn(mock(MvTableStorage.class));
         when(tbl.storage().configuration()).thenReturn(mock(TableConfiguration.class));
         when(tbl.storage().configuration().partitions()).thenReturn(mock(ConfigurationValue.class));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 114338f3d2..f9627646d5 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -81,6 +81,8 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -342,7 +344,7 @@ public class ExecutionServiceImplTest {
         var mailboxRegistry = new MailboxRegistryImpl();
 
         var exchangeService = new ExchangeServiceImpl(
-                nodeId,
+                new ClusterNode(nodeId, "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
                 taskExecutor,
                 mailboxRegistry,
                 messageService
@@ -353,7 +355,7 @@ public class ExecutionServiceImplTest {
         when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
 
         var executionService = new ExecutionServiceImpl<>(
-                nodeId,
+                new ClusterNode(nodeId, "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
                 messageService,
                 (single, filter) -> single ? List.of(nodeIds.get(ThreadLocalRandom.current().nextInt(nodeIds.size()))) : nodeIds,
                 schemaManagerMock,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index e743e60eb1..af6d1daf2b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -111,7 +113,7 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest {
                                 .build(),
                         null,
                         UUID.randomUUID(),
-                        "fake-test-node",
+                        new ClusterNode("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
                         "fake-test-node",
                         null,
                         ArrayRowHandler.INSTANCE,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 7fb9ce6b6e..efa9e13d33 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
+import static org.mockito.Mockito.mock;
+
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.util.ArrayDeque;
 import java.util.Deque;
@@ -45,7 +47,10 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -103,12 +108,12 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
                         .build(),
                 taskExecutor,
                 UUID.randomUUID(),
-                "fake-test-node",
+                new ClusterNode("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
                 "fake-test-node",
                 fragmentDesc,
                 ArrayRowHandler.INSTANCE,
                 Map.of(),
-                null
+                mock(InternalTransaction.class)
         );
     }