You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2023/08/25 13:55:54 UTC

[ignite-3] branch main updated: IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fd6b984d5 IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449)
4fd6b984d5 is described below

commit 4fd6b984d5a46ddb2bd6def55365a6b349feb720
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Fri Aug 25 16:55:48 2023 +0300

    IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449)
---
 .../internal/cli/CliIntegrationTestBase.java       |   7 +-
 .../client/handler/JdbcQueryEventHandlerImpl.java  |   2 +
 .../requests/sql/ClientSqlExecuteRequest.java      |   4 +-
 .../handler/JdbcQueryEventHandlerImplTest.java     |   8 +-
 .../requests/jdbc/JdbcQueryCursorSelfTest.java     |  15 +-
 .../client/fakes/FakeIgniteQueryProcessor.java     |   3 +-
 .../internal/ClusterPerTestIntegrationTest.java    |   2 +-
 .../benchmark/AbstractOneNodeBenchmark.java        |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   1 -
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  48 ++--
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  22 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |   7 +-
 .../engine/datatypes/tests/BaseDataTypeTest.java   |   6 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../sql/engine/util/TestQueryProcessor.java        |  13 +-
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |   9 +-
 .../internal/sql/api/SessionBuilderImpl.java       |   8 +-
 .../ignite/internal/sql/api/SessionImpl.java       |  12 +-
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |  21 +-
 .../ignite/internal/sql/engine/QueryProcessor.java |   6 +-
 .../sql/engine/QueryTransactionWrapper.java        |  59 +++++
 .../internal/sql/engine/SqlQueryProcessor.java     | 252 +++++++++++----------
 .../sql/engine/AsyncSqlCursorImplTest.java         |  51 ++---
 .../engine/QueryTransactionWrapperSelfTest.java    | 131 +++++++++++
 .../internal/sql/engine/StopCalciteModuleTest.java |  10 +-
 .../sql/engine/exec/MockedStructuresTest.java      |  88 +++----
 .../internal/sql/engine/util/QueryChecker.java     |   9 +-
 .../ignite/internal/table/AbstractTableView.java   |   2 +-
 28 files changed, 522 insertions(+), 279 deletions(-)

diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
index becb71c8cd..b06ea4bc9d 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
@@ -29,11 +29,13 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.testframework.IntegrationTestBase;
 import org.apache.ignite.table.Table;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInstance;
@@ -88,7 +90,8 @@ public abstract class CliIntegrationTestBase extends IntegrationTestBase {
     }
 
     protected static List<List<Object>> sql(@Nullable Transaction tx, String sql, Object... args) {
-        var queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+        QueryProcessor queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+        IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
 
         SessionId sessionId = queryEngine.createSession(PropertiesHelper.emptyHolder());
 
@@ -96,7 +99,7 @@ public abstract class CliIntegrationTestBase extends IntegrationTestBase {
             var context = QueryContext.create(SqlQueryType.ALL, tx);
 
             return getAllFromCursor(
-                    await(queryEngine.querySingleAsync(sessionId, context, sql, args))
+                    await(queryEngine.querySingleAsync(sessionId, context, transactions, sql, args))
             );
         } finally {
             queryEngine.closeSession(sessionId);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index c04486f5ec..ea632c4f4d 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -165,6 +165,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
         CompletableFuture<AsyncSqlCursor<List<Object>>> result = connectionContext.doInSession(sessionId -> processor.querySingleAsync(
                 sessionId,
                 context,
+                igniteTransactions,
                 req.sqlQuery(),
                 req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
         ));
@@ -271,6 +272,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
         CompletableFuture<AsyncSqlCursor<List<Object>>> result = connCtx.doInSession(sessionId -> processor.querySingleAsync(
                 sessionId,
                 queryContext,
+                igniteTransactions,
                 sql,
                 arg == null ? OBJECT_EMPTY_ARRAY : arg
         ));
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index ad8675dd08..616c949a7d 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -79,7 +79,7 @@ public class ClientSqlExecuteRequest {
             arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
         }
 
-        // TODO IGNITE-19898 SQL implicit RO transaction should use observation timestamp.
+        // TODO IGNITE-20232 Propagate observable timestamp to sql engine using internal API.
         HybridTimestamp unused = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
 
         return session
@@ -87,7 +87,7 @@ public class ClientSqlExecuteRequest {
                 .thenCompose(asyncResultSet -> {
                     //noinspection StatementWithEmptyBody
                     if (tx == null) {
-                        // TODO IGNITE-19898 Return readTimestamp from implicit RO TX to the client
+                        // TODO IGNITE-20232 Propagate observable timestamp to sql engine using internal API.
                         // out.meta(asyncResultSet.tx().readTimestamp());
                     }
 
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
index 89823b4cb9..6477c9b8bc 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
@@ -175,7 +175,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
 
         when(queryProcessor.createSession(any())).thenReturn(expectedSessionId);
 
-        when(queryProcessor.querySingleAsync(eq(expectedSessionId), any(), any()))
+        when(queryProcessor.querySingleAsync(eq(expectedSessionId), any(), any(), any()))
                 .thenReturn(CompletableFuture.failedFuture(new RuntimeException("This is fine")));
 
         long connectionId = acquireConnectionId();
@@ -185,7 +185,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
         )));
 
         verify(queryProcessor).createSession(any());
-        verify(queryProcessor).querySingleAsync(eq(expectedSessionId), any(), any(), any(Object[].class));
+        verify(queryProcessor).querySingleAsync(eq(expectedSessionId), any(), any(), any(), any(Object[].class));
         verifyNoMoreInteractions(queryProcessor);
     }
 
@@ -240,7 +240,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
 
     @Test
     public void singleTxUsedForMultipleOperations() {
-        when(queryProcessor.querySingleAsync(any(), any(), any()))
+        when(queryProcessor.querySingleAsync(any(), any(), any(), any()))
                 .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected")));
 
         Transaction tx = mock(Transaction.class);
@@ -273,7 +273,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
         verify(tx).commitAsync();
 
         verifyNoMoreInteractions(igniteTransactions);
-        verify(queryProcessor, times(5)).querySingleAsync(any(), any(), any(), any(Object[].class));
+        verify(queryProcessor, times(5)).querySingleAsync(any(), any(), any(), any(), any(Object[].class));
     }
 
     @Test
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
index 3d1ae15c63..29bc7db0f4 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
@@ -24,17 +24,26 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
+import org.apache.ignite.internal.sql.engine.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.exec.AsyncWrapper;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * Test class for {@link JdbcQueryCursor}.
  */
-public class JdbcQueryCursorSelfTest {
+@ExtendWith(MockitoExtension.class)
+public class JdbcQueryCursorSelfTest extends BaseIgniteAbstractTest {
+    @Mock
+    private QueryTransactionWrapper txWrapper;
+
     private static final List<Integer> ROWS = List.of(1, 2, 3);
 
     private static final int TOTAL_ROWS_COUNT = ROWS.size();
@@ -58,9 +67,9 @@ public class JdbcQueryCursorSelfTest {
         assertEquals(ROWS, results);
     }
 
-    private static List<Integer> fetchFullBatch(int maxRows, int fetchSize) {
+    private List<Integer> fetchFullBatch(int maxRows, int fetchSize) {
         JdbcQueryCursor<Integer> cursor = new JdbcQueryCursor<>(maxRows,
-                new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, null, null,
+                new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, null, txWrapper,
                         new AsyncWrapper<>(CompletableFuture.completedFuture(ROWS.iterator()), Runnable::run)));
 
         List<Integer> results = new ArrayList<>(maxRows);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index 08106c9cc6..c8c73fa78b 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.session.SessionInfo;
+import org.apache.ignite.tx.IgniteTransactions;
 
 /**
  * Fake {@link QueryProcessor}.
@@ -49,7 +50,7 @@ public class FakeIgniteQueryProcessor implements QueryProcessor {
 
     @Override
     public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
-            SessionId sessionid, QueryContext context, String qry,
+            SessionId sessionid, QueryContext context, IgniteTransactions transactions, String qry,
             Object... params) {
         return CompletableFuture.completedFuture(new FakeCursor());
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 2413724fc4..9d3c4a57fd 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -207,7 +207,7 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
         QueryContext context = QueryContext.create(SqlQueryType.ALL);
 
         return getAllFromCursor(
-                qryProc.querySingleAsync(sessionId, context, sql, args).join()
+                qryProc.querySingleAsync(sessionId, context, node(0).transactions(), sql, args).join()
         );
     }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
index 88f177aa8b..3de017059b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
@@ -114,7 +114,7 @@ public class AbstractOneNodeBenchmark {
             var context = QueryContext.create(SqlQueryType.ALL);
 
             getAllFromCursor(
-                    await(queryEngine.querySingleAsync(sessionId, context, sql))
+                    await(queryEngine.querySingleAsync(sessionId, context, clusterNode.transactions(), sql))
             );
         } finally {
             queryEngine.closeSession(sessionId);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index fbaf31240f..5a661e2634 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -400,7 +400,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
                 indexManager,
                 schemaManager,
                 dataStorageManager,
-                txManager,
                 distributionZoneManager,
                 () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
                 replicaSvc,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 48615d41ba..10ee8fb135 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -459,13 +459,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void select() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         TestPageProcessor pageProc = new TestPageProcessor(4);
         await(ses.executeAsync(null, "SELECT ID FROM TEST").thenCompose(pageProc));
 
@@ -481,11 +482,12 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void metadata() {
         sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT NULL)");
-        sql("INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().build();
 
+        ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
+
         AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, COL0 FROM TEST"));
 
         // Validate columns metadata.
@@ -561,13 +563,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void pageSequence() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(1).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID"));
         var p0 = ars0.currentPage();
         AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage());
@@ -601,13 +604,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void errors() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         // Parse error.
         checkError(SqlException.class, STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM");
 
@@ -685,13 +689,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void closeSession() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(2).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST"));
 
         await(ses.closeAsync());
@@ -769,13 +774,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void resultSetCloseShouldFinishImplicitTransaction() throws InterruptedException {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
-
         Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST");
 
         AsyncResultSet<SqlRow> ars = f.join();
@@ -790,14 +796,16 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void resultSetFullReadShouldFinishImplicitTransaction() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
 
         // Fetch all data in one read.
         Session ses = sql.sessionBuilder().defaultPageSize(100).build();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST");
 
         AsyncResultSet<SqlRow> ars = f.join();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index dc1bf88eb7..518c8faf1a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -249,13 +249,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void select() throws Exception {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST");
 
         Set<Integer> set = Streams.stream(rs).map(r -> r.intValue(0)).collect(Collectors.toSet());
@@ -272,13 +273,13 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void errors() throws InterruptedException {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(2).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         // Parse error.
         checkError(SqlException.class, STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM");
 
@@ -420,13 +421,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void resultSetCloseShouldFinishImplicitTransaction() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
 
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(2).build();
 
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
         ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST");
         assertEquals(1, txManager().pending());
         rs.close();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 1f31358f2b..bf2fb5ac01 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -287,6 +287,11 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe
             protected QueryProcessor getEngine() {
                 return ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
             }
+
+            @Override
+            protected IgniteTransactions transactions() {
+                return CLUSTER_NODES.get(0).transactions();
+            }
         };
     }
 
@@ -446,7 +451,7 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe
             var context = QueryContext.create(SqlQueryType.ALL, tx);
 
             return getAllFromCursor(
-                    await(queryEngine.querySingleAsync(sessionId, context, sql, args))
+                    await(queryEngine.querySingleAsync(sessionId, context, CLUSTER_NODES.get(0).transactions(), sql, args))
             );
         } finally {
             queryEngine.closeSession(sessionId);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
index 1a795c0325..bceb796de4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.util.QueryChecker.QueryTemplate;
 import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.provider.Arguments;
@@ -125,6 +126,11 @@ public abstract class BaseDataTypeTest<T extends Comparable<T>> extends ClusterP
                 return ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
             }
 
+            @Override
+            protected IgniteTransactions transactions() {
+                return igniteTx();
+            }
+
             @Override
             protected void checkMetadata(AsyncSqlCursor<?> cursor) {
                 Optional<ColumnMetadata> testKey = cursor.metadata().columns()
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bbc51f28bd..9828499320 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -555,7 +555,6 @@ public class IgniteImpl implements Ignite {
                 indexManager,
                 schemaManager,
                 dataStorageMgr,
-                txManager,
                 distributionZoneManager,
                 () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
                 replicaSvc,
@@ -564,7 +563,7 @@ public class IgniteImpl implements Ignite {
                 metricManager
         );
 
-        sql = new IgniteSqlImpl(qryEngine);
+        sql = new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager));
 
         var deploymentManagerImpl = new DeploymentManagerImpl(
                 clusterSvc,
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
index a62a433cf1..b78178768a 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.session.SessionInfo;
+import org.apache.ignite.tx.IgniteTransactions;
 
 /**
  * {@link QueryProcessor} that handles test {@link NativeTypeWrapper native type wrappers} .
@@ -71,11 +72,15 @@ public final class TestQueryProcessor implements QueryProcessor {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(SessionId sessionId, QueryContext context, String qry,
-            Object... params) {
-
+    public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
+            SessionId sessionId,
+            QueryContext context,
+            IgniteTransactions transactions,
+            String qry,
+            Object... params
+    ) {
         Object[] unwrappedParams = Arrays.stream(params).map(NativeTypeWrapper::unwrap).toArray();
 
-        return queryProcessor.querySingleAsync(sessionId, context, qry, unwrappedParams);
+        return queryProcessor.querySingleAsync(sessionId, context, transactions, qry, unwrappedParams);
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 6f164001b2..8f674bfe17 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.Session.SessionBuilder;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.IgniteTransactions;
 
 /**
  * Embedded implementation of the Ignite SQL query facade.
@@ -31,13 +32,17 @@ import org.apache.ignite.sql.Statement.StatementBuilder;
 public class IgniteSqlImpl implements IgniteSql {
     private final QueryProcessor qryProc;
 
+    private final IgniteTransactions transactions;
+
     /**
      * Constructor.
      *
      * @param qryProc Query processor.
+     * @param transactions Transactions facade.
      */
-    public IgniteSqlImpl(QueryProcessor qryProc) {
+    public IgniteSqlImpl(QueryProcessor qryProc, IgniteTransactions transactions) {
         this.qryProc = qryProc;
+        this.transactions = transactions;
     }
 
     /** {@inheritDoc} */
@@ -49,7 +54,7 @@ public class IgniteSqlImpl implements IgniteSql {
     /** {@inheritDoc} */
     @Override
     public SessionBuilder sessionBuilder() {
-        return new SessionBuilderImpl(qryProc, new HashMap<>());
+        return new SessionBuilderImpl(qryProc, transactions, new HashMap<>());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
index 352db1c689..3ed4ca61b2 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.session.SessionProperty;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.Session.SessionBuilder;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -40,6 +41,8 @@ public class SessionBuilderImpl implements SessionBuilder {
 
     private final QueryProcessor qryProc;
 
+    private final IgniteTransactions transactions;
+
     private long queryTimeout = DEFAULT_QUERY_TIMEOUT;
 
     private long sessionTimeout = DEFAULT_SESSION_TIMEOUT;
@@ -54,10 +57,12 @@ public class SessionBuilderImpl implements SessionBuilder {
      * Session builder constructor.
      *
      * @param qryProc SQL query processor.
+     * @param transactions Transactions facade.
      * @param props Initial properties.
      */
-    SessionBuilderImpl(QueryProcessor qryProc, Map<String, Object> props) {
+    SessionBuilderImpl(QueryProcessor qryProc, IgniteTransactions transactions, Map<String, Object> props) {
         this.qryProc = qryProc;
+        this.transactions = transactions;
         this.props = props;
     }
 
@@ -145,6 +150,7 @@ public class SessionBuilderImpl implements SessionBuilder {
         return new SessionImpl(
                 sessionId,
                 qryProc,
+                transactions,
                 pageSize,
                 propsHolder
         );
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 638ad0495b..fca3f6ae50 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -59,6 +59,7 @@ import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.sql.reactive.ReactiveResultSet;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -73,6 +74,8 @@ public class SessionImpl implements AbstractSession {
 
     private final QueryProcessor qryProc;
 
+    private final IgniteTransactions transactions;
+
     private final SessionId sessionId;
 
     private final int pageSize;
@@ -83,16 +86,19 @@ public class SessionImpl implements AbstractSession {
      * Constructor.
      *
      * @param qryProc Query processor.
+     * @param transactions Transactions facade.
      * @param pageSize Query fetch page size.
      * @param props Session's properties.
      */
     SessionImpl(
             SessionId sessionId,
             QueryProcessor qryProc,
+            IgniteTransactions transactions,
             int pageSize,
             PropertiesHolder props
     ) {
         this.qryProc = qryProc;
+        this.transactions = transactions;
         this.sessionId = sessionId;
         this.pageSize = pageSize;
         this.props = props;
@@ -155,7 +161,7 @@ public class SessionImpl implements AbstractSession {
             propertyMap.put(entry.getKey().name, entry.getValue());
         }
 
-        return new SessionBuilderImpl(qryProc, propertyMap)
+        return new SessionBuilderImpl(qryProc, transactions, propertyMap)
                 .defaultPageSize(pageSize);
     }
 
@@ -174,7 +180,7 @@ public class SessionImpl implements AbstractSession {
         try {
             QueryContext ctx = QueryContext.create(SqlQueryType.ALL, transaction);
 
-            result = qryProc.querySingleAsync(sessionId, ctx, query, arguments)
+            result = qryProc.querySingleAsync(sessionId, ctx, transactions, query, arguments)
                     .thenCompose(cur -> cur.requestNextAsync(pageSize)
                             .thenApply(
                                     batchRes -> new AsyncResultSetImpl<>(
@@ -251,7 +257,7 @@ public class SessionImpl implements AbstractSession {
                 Object[] args = batch.get(i).toArray();
 
                 final var qryFut = tail
-                        .thenCompose(v -> qryProc.querySingleAsync(sessionId, ctx, query, args));
+                        .thenCompose(v -> qryProc.querySingleAsync(sessionId, ctx, transactions, query, args));
 
                 tail = qryFut.thenCompose(cur -> cur.requestNextAsync(1))
                         .thenAccept(page -> {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 99251fa8cd..5d1baead5c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.sql.engine;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteExceptionMapperUtil;
 import org.apache.ignite.sql.ResultSetMetadata;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Sql query cursor.
@@ -33,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
 public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     private final SqlQueryType queryType;
     private final ResultSetMetadata meta;
-    private final @Nullable InternalTransaction implicitTx;
+    private final QueryTransactionWrapper txWrapper;
     private final AsyncCursor<T> dataCursor;
 
     /**
@@ -46,12 +44,12 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     public AsyncSqlCursorImpl(
             SqlQueryType queryType,
             ResultSetMetadata meta,
-            @Nullable InternalTransaction implicitTx,
+            QueryTransactionWrapper txWrapper,
             AsyncCursor<T> dataCursor
     ) {
         this.queryType = queryType;
         this.meta = meta;
-        this.implicitTx = implicitTx;
+        this.txWrapper = txWrapper;
         this.dataCursor = dataCursor;
     }
 
@@ -72,16 +70,14 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
         return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
             if (t != null) {
-                if (implicitTx != null) {
-                    implicitTx.rollback();
-                }
+                txWrapper.rollbackImplicit();
 
                 throw new CompletionException(wrapIfNecessary(t));
             }
 
-            if (implicitTx != null && !batch.hasMore()) {
+            if (!batch.hasMore()) {
                 // last batch, need to commit transaction
-                implicitTx.commit();
+                txWrapper.commitImplicit();
             }
 
             return batch;
@@ -92,9 +88,8 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     @Override
     public CompletableFuture<Void> closeAsync() {
         // Commit implicit transaction, if any.
-        if (implicitTx != null) {
-            implicitTx.commit();
-        }
+        txWrapper.commitImplicit();
+
         return dataCursor.closeAsync();
     }
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 92bacfd50d..b52ef56d34 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.session.SessionInfo;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.tx.IgniteTransactions;
 
 /**
  * QueryProcessor interface.
@@ -61,8 +62,10 @@ public interface QueryProcessor extends IgniteComponent {
      *
      * <p>If the query string contains more than one statement the IgniteException will be thrown.
      *
+     * @param sessionId A session identifier.
      * @param context User query context.
-     * @param qry Single statement SQL query .
+     * @param transactions Transactions facade.
+     * @param qry Single statement SQL query.
      * @param params Query parameters.
      * @return Sql cursor.
      *
@@ -71,6 +74,7 @@ public interface QueryProcessor extends IgniteComponent {
     CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
             SessionId sessionId,
             QueryContext context,
+            IgniteTransactions transactions,
             String qry,
             Object... params
     );
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
new file mode 100644
index 0000000000..e39be75076
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+/**
+ * Wrapper for the transaction that encapsulates the management of an implicit transaction.
+ */
+public class QueryTransactionWrapper {
+    private final boolean implicit;
+
+    private final InternalTransaction transaction;
+
+    QueryTransactionWrapper(InternalTransaction transaction, boolean implicit) {
+        this.transaction = transaction;
+        this.implicit = implicit;
+    }
+
+    /**
+     * Unwrap transaction.
+     */
+    InternalTransaction unwrap() {
+        return transaction;
+    }
+
+    /**
+     * Commits an implicit transaction, if one has been started.
+     */
+    void commitImplicit() {
+        if (implicit) {
+            transaction.commit();
+        }
+    }
+
+    /**
+     * Rolls back an implicit transaction, if one has been started.
+     */
+    void rollbackImplicit() {
+        if (implicit) {
+            transaction.rollback();
+        }
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 647cc4e934..33445d9aac 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
@@ -69,6 +68,7 @@ import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
@@ -91,7 +91,6 @@ import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -99,11 +98,13 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.SchemaNotFoundException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
- *  SqlQueryProcessor.
+ * SqlQueryProcessor.
  *  TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class SqlQueryProcessor implements QueryProcessor {
@@ -172,9 +173,6 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private volatile SqlSchemaManager sqlSchemaManager;
 
-    /** Transaction manager. */
-    private final TxManager txManager;
-
     /** Distribution zones manager. */
     private final DistributionZoneManager distributionZoneManager;
 
@@ -198,7 +196,6 @@ public class SqlQueryProcessor implements QueryProcessor {
             IndexManager indexManager,
             SchemaManager schemaManager,
             DataStorageManager dataStorageManager,
-            TxManager txManager,
             DistributionZoneManager distributionZoneManager,
             Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier,
             ReplicaService replicaService,
@@ -212,7 +209,6 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.indexManager = indexManager;
         this.schemaManager = schemaManager;
         this.dataStorageManager = dataStorageManager;
-        this.txManager = txManager;
         this.distributionZoneManager = distributionZoneManager;
         this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
         this.replicaService = replicaService;
@@ -367,14 +363,18 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
-            SessionId sessionId, QueryContext context, String qry, Object... params
+            SessionId sessionId,
+            QueryContext context,
+            IgniteTransactions transactions,
+            String qry,
+            Object... params
     ) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
         }
 
         try {
-            return querySingle0(sessionId, context, qry, params);
+            return querySingle0(sessionId, context, transactions, qry, params);
         } finally {
             busyLock.leaveBusy();
         }
@@ -401,6 +401,7 @@ public class SqlQueryProcessor implements QueryProcessor {
     private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
             SessionId sessionId,
             QueryContext context,
+            IgniteTransactions transactions,
             String sql,
             Object... params
     ) {
@@ -429,93 +430,120 @@ public class SqlQueryProcessor implements QueryProcessor {
             return CompletableFuture.failedFuture(new SessionNotFoundException(sessionId));
         }
 
-        CompletableFuture<Void> start = new CompletableFuture<>();
+        CompletableFuture<AsyncSqlCursor<List<Object>>> start = new CompletableFuture<>();
 
-        AtomicReference<InternalTransaction> tx = new AtomicReference<>();
+        CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start.thenCompose(ignored -> {
+            ParsedResult result = parserService.parse(sql);
 
-        CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
-                .thenCompose(ignored -> {
-                    ParsedResult result = parserService.parse(sql);
+            validateParsedStatement(context, result, params);
 
-                    validateParsedStatement(context, outerTx, result, params);
+            QueryTransactionWrapper txWrapper = wrapTxOrStartImplicit(result.queryType(), transactions, outerTx);
 
-                    boolean rwOp = dataModificationOp(result);
+            return waitForActualSchema(schemaName, txWrapper.unwrap().startTimestamp())
+                    .thenCompose(schema -> {
+                        BaseQueryContext ctx = BaseQueryContext.builder()
+                                .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+                                .logger(LOG)
+                                .cancel(queryCancel)
+                                .parameters(params).build();
 
-                    boolean implicitTxRequired = outerTx == null;
+                        return prepareSvc.prepareAsync(result, ctx).thenApply(plan -> executePlan(session, txWrapper, ctx, plan));
+                    }).whenComplete((res, ex) -> {
+                        if (ex != null) {
+                            txWrapper.rollbackImplicit();
+                        }
+                    });
+        });
 
-                    InternalTransaction currentTx = implicitTxRequired ? txManager.begin(!rwOp, null) : outerTx;
+        // TODO IGNITE-20078 Improve (or remove) CancellationException handling.
+        stage.whenComplete((cur, ex) -> {
+            if (ex instanceof CancellationException) {
+                queryCancel.cancel();
+            }
+        });
 
-                    tx.set(currentTx);
+        start.completeAsync(() -> null, taskExecutor);
 
-                    // TODO IGNITE-18733: wait for actual metadata for TX.
-                    HybridTimestamp txTimestamp = currentTx.startTimestamp();
+        return stage;
+    }
 
-                    SchemaPlus schema = sqlSchemaManager.schema(schemaName, txTimestamp.longValue());
+    private CompletableFuture<SchemaPlus> waitForActualSchema(String schemaName, HybridTimestamp timestamp) {
+        try {
+            // TODO IGNITE-18733: wait for actual metadata for TX.
+            SchemaPlus schema = sqlSchemaManager.schema(schemaName, timestamp.longValue());
 
-                    if (schema == null) {
-                        return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName));
-                    }
+            if (schema == null) {
+                return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName));
+            }
 
-                    BaseQueryContext ctx = BaseQueryContext.builder()
-                            .frameworkConfig(
-                                    Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                            .defaultSchema(schema)
-                                            .build()
-                            )
-                            .logger(LOG)
-                            .cancel(queryCancel)
-                            .parameters(params)
-                            .build();
-
-                    return prepareSvc.prepareAsync(result, ctx)
-                            .thenApply(plan -> {
-                                var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx);
-
-                                SqlQueryType queryType = plan.type();
-                                assert queryType != null : "Expected a full plan but got a fragment: " + plan;
-
-                                numberOfOpenCursors.incrementAndGet();
-
-                                return new AsyncSqlCursorImpl<>(
-                                        queryType,
-                                        plan.metadata(),
-                                        implicitTxRequired ? tx.get() : null,
-                                        new AsyncCursor<List<Object>>() {
-                                            @Override
-                                            public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
-                                                session.touch();
-
-                                                return dataCursor.requestNextAsync(rows);
-                                            }
-
-                                            @Override
-                                            public CompletableFuture<Void> closeAsync() {
-                                                session.touch();
-                                                numberOfOpenCursors.decrementAndGet();
-
-                                                return dataCursor.closeAsync();
-                                            }
-                                        }
-                                );
-                            });
-                });
+            return CompletableFuture.completedFuture(schema);
+        } catch (Throwable t) {
+            return CompletableFuture.failedFuture(t);
+        }
+    }
 
-        stage.whenComplete((cur, ex) -> {
-            if (ex instanceof CancellationException) {
-                queryCancel.cancel();
-            }
+    private AsyncSqlCursor<List<Object>> executePlan(
+            Session session,
+            QueryTransactionWrapper txWrapper,
+            BaseQueryContext ctx,
+            QueryPlan plan
+    ) {
+        var dataCursor = executionSrvc.executePlan(txWrapper.unwrap(), plan, ctx);
+
+        SqlQueryType queryType = plan.type();
+        assert queryType != null : "Expected a full plan but got a fragment: " + plan;
 
-            if (ex != null && outerTx == null) {
-                InternalTransaction tx0 = tx.get();
-                if (tx0 != null) {
-                    tx0.rollback();
+        numberOfOpenCursors.incrementAndGet();
+
+        return new AsyncSqlCursorImpl<>(
+                queryType,
+                plan.metadata(),
+                txWrapper,
+                new AsyncCursor<List<Object>>() {
+                    @Override
+                    public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+                        session.touch();
+
+                        return dataCursor.requestNextAsync(rows);
+                    }
+
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        session.touch();
+                        numberOfOpenCursors.decrementAndGet();
+
+                        return dataCursor.closeAsync();
+                    }
                 }
-            }
-        });
+        );
+    }
 
-        start.completeAsync(() -> null, taskExecutor);
+    /**
+     * Creates a new transaction wrapper using an existing outer transaction or starting a new "implicit" transaction.
+     *
+     * @param queryType Query type.
+     * @param transactions Transactions facade.
+     * @param outerTx Outer transaction.
+     * @return Wrapper for an active transaction.
+     * @throws SqlException If an outer transaction was started for a {@link SqlQueryType#DDL DDL} query.
+     */
+    static QueryTransactionWrapper wrapTxOrStartImplicit(
+            SqlQueryType queryType,
+            IgniteTransactions transactions,
+            @Nullable InternalTransaction outerTx
+    ) {
+        if (outerTx == null) {
+            InternalTransaction tx = (InternalTransaction) transactions.begin(
+                    new TransactionOptions().readOnly(queryType != SqlQueryType.DML));
 
-        return stage;
+            return new QueryTransactionWrapper(tx, true);
+        }
+
+        if (SqlQueryType.DDL == queryType) {
+            throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions.");
+        }
+
+        return new QueryTransactionWrapper(outerTx, false);
     }
 
     @TestOnly
@@ -548,11 +576,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableCreated(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -566,11 +594,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableUpdated(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -584,11 +612,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableDropped(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -602,12 +630,12 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(IndexEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onIndexDropped(
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
-                    DEFAULT_SCHEMA_NAME,
-                    parameters.tableId(),
-                    parameters.indexId(),
-                    parameters.causalityToken()
-            )
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+                            DEFAULT_SCHEMA_NAME,
+                            parameters.tableId(),
+                            parameters.indexId(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
@@ -621,24 +649,18 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(IndexEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onIndexCreated(
-                    parameters.tableId(),
-                    parameters.indexId(),
-                    parameters.indexDescriptor(),
-                    parameters.causalityToken()
-            )
+                            parameters.tableId(),
+                            parameters.indexId(),
+                            parameters.indexDescriptor(),
+                            parameters.causalityToken()
+                    )
                     .thenApply(v -> false);
         }
     }
 
-    /** Returns {@code true} if this is data modification operation. */
-    private static boolean dataModificationOp(ParsedResult parsedResult) {
-        return parsedResult.queryType() == SqlQueryType.DML;
-    }
-
     /** Performs additional validation of a parsed statement. **/
     private static void validateParsedStatement(
             QueryContext context,
-            @Nullable InternalTransaction outerTx,
             ParsedResult parsedResult,
             Object[] params
     ) {
@@ -651,10 +673,6 @@ public class SqlQueryProcessor implements QueryProcessor {
             throw new QueryValidationException(message);
         }
 
-        if (SqlQueryType.DDL == queryType && outerTx != null) {
-            throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions.");
-        }
-
         if (parsedResult.dynamicParamsCount() != params.length) {
             String message = format(
                     "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).",
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
index dec2b80a98..eb30c61bcb 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
@@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Collections;
 import java.util.List;
@@ -48,70 +47,66 @@ public class AsyncSqlCursorImplTest {
     private static final ResultSetMetadata RESULT_SET_METADATA = new ResultSetMetadataImpl(Collections.emptyList());
 
     /** Cursor should trigger commit of implicit transaction (if any) only if data is fully read. */
-    @ParameterizedTest
+    @ParameterizedTest(name = "{0}")
     @MethodSource("transactions")
-    public void testTriggerCommitAfterDataIsFullyRead(NoOpTransaction implicitTx) {
+    public void testTriggerCommitAfterDataIsFullyRead(boolean implicit, QueryTransactionWrapper txWrapper) {
         List<Integer> list = List.of(1, 2, 3);
 
-        AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+        AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper,
                 new AsyncWrapper<>(CompletableFuture.completedFuture(list.iterator()), Runnable::run));
 
         int requestRows = 2;
         BatchedResult<Integer> in1 = cursor.requestNextAsync(requestRows).join();
         assertEquals(in1.items(), list.subList(0, requestRows));
 
-        if (implicitTx != null) {
-            CompletableFuture<Void> f = implicitTx.commitFuture();
-            assertFalse(f.isDone(), "Implicit transaction should have not been committed because there is more data.");
-        }
+        assertFalse(((NoOpTransaction) txWrapper.unwrap()).commitFuture().isDone(),
+                "Implicit transaction should have not been committed because there is more data.");
 
         BatchedResult<Integer> in2 = cursor.requestNextAsync(requestRows).join();
         assertEquals(in2.items(), list.subList(requestRows, list.size()));
 
-        if (implicitTx != null) {
-            CompletableFuture<Void> f = implicitTx.commitFuture();
-            assertTrue(f.isDone(), "Implicit transaction should been committed because there is no more data");
-        }
+        CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).commitFuture();
+        assertEquals(implicit, f.isDone(), "Implicit transaction should been committed because there is no more data");
     }
 
     /** Exception on read should trigger rollback of implicit transaction, if any. */
-    @ParameterizedTest
+    @ParameterizedTest(name = "{0}")
     @MethodSource("transactions")
-    public void testExceptionRollbacksImplicitTx(NoOpTransaction implicitTx) {
+    public void testExceptionRollbacksImplicitTx(boolean implicit, QueryTransactionWrapper txWrapper) {
         IgniteException err = new IgniteException(Common.INTERNAL_ERR);
 
-        AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+        AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper,
                 new AsyncWrapper<>(CompletableFuture.failedFuture(err), Runnable::run));
 
         CompletionException t = assertThrows(CompletionException.class, () -> cursor.requestNextAsync(1).join());
 
-        if (implicitTx != null) {
-            CompletableFuture<Void> f = implicitTx.rollbackFuture();
-            assertTrue(f.isDone(), "Implicit transaction should have been rolled back: " + f);
-        }
+        CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).rollbackFuture();
+        assertEquals(implicit, f.isDone(), "Implicit transaction should have been rolled back: " + f);
 
         IgniteException igniteErr = assertInstanceOf(IgniteException.class, t.getCause());
         assertEquals(err.codeAsString(), igniteErr.codeAsString());
     }
 
     /** Cursor close should trigger commit of implicit transaction, if any. */
-    @ParameterizedTest
+    @ParameterizedTest(name = "{0}")
     @MethodSource("transactions")
-    public void testCloseCommitsImplicitTx(NoOpTransaction implicitTx) {
+    public void testCloseCommitsImplicitTx(boolean implicit, QueryTransactionWrapper txWrapper) {
         AsyncCursor<Integer> data = new AsyncWrapper<>(List.of(1, 2, 3, 4).iterator());
-        AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, data);
+        AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper, data);
         cursor.closeAsync().join();
 
-        if (implicitTx != null) {
-            CompletableFuture<Void> f = implicitTx.commitFuture();
-            assertTrue(f.isDone(), "Implicit transaction should have been committed: " + f);
-        }
+        CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).commitFuture();
+        assertEquals(implicit, f.isDone(), "Implicit transaction should have been committed: " + f);
     }
 
     private static Stream<Arguments> transactions() {
         return Stream.of(
-                Arguments.of(Named.named("implicit-tx", NoOpTransaction.readOnly("TX"))),
-                Arguments.of(Named.named("no implicit-tx", null))
+                Arguments.of(Named.named("implicit-tx", true), newTxWrapper(true)),
+                Arguments.of(Named.named("explicit-tx", false), newTxWrapper(false))
         );
     }
+
+    private static QueryTransactionWrapper newTxWrapper(boolean implicit) {
+        return new QueryTransactionWrapper(NoOpTransaction.readOnly("TX"), implicit);
+    }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
new file mode 100644
index 0000000000..f3c9d0e659
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.SqlQueryProcessor.wrapTxOrStartImplicit;
+import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.EnumSet;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.ErrorGroups;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for class {@link QueryTransactionWrapper}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class QueryTransactionWrapperSelfTest extends BaseIgniteAbstractTest {
+    @Mock
+    private IgniteTransactions transactions;
+
+    @Test
+    public void throwsExceptionForDdlWithExternalTransaction() {
+        //noinspection ThrowableNotThrown
+        assertThrowsSqlException(ErrorGroups.Sql.STMT_VALIDATION_ERR,
+                () -> wrapTxOrStartImplicit(SqlQueryType.DDL, transactions, new NoOpTransaction("test")));
+        verifyNoInteractions(transactions);
+    }
+
+    @Test
+    public void testImplicitTransactionAttributes() {
+        when(transactions.begin(any())).thenAnswer(
+                inv -> {
+                    boolean readOnly = inv.getArgument(0, TransactionOptions.class).readOnly();
+
+                    return readOnly ? NoOpTransaction.readOnly("test-ro") : NoOpTransaction.readWrite("test-rw");
+                }
+        );
+
+        assertThat(wrapTxOrStartImplicit(SqlQueryType.DML, transactions, null).unwrap().isReadOnly(), equalTo(false));
+
+        for (SqlQueryType type : EnumSet.complementOf(EnumSet.of(SqlQueryType.DML))) {
+            assertThat(wrapTxOrStartImplicit(type, transactions, null).unwrap().isReadOnly(), equalTo(true));
+        }
+
+        verify(transactions, times(SqlQueryType.values().length)).begin(any());
+        verifyNoMoreInteractions(transactions);
+    }
+
+    @Test
+    public void commitAndRollbackNotAffectExternalTransaction() {
+        NoOpTransaction externalTx = new NoOpTransaction("test");
+
+        QueryTransactionWrapper wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, externalTx);
+        wrapper.commitImplicit();
+        assertFalse(externalTx.commitFuture().isDone());
+
+        wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, externalTx);
+        wrapper.rollbackImplicit();
+        assertFalse(externalTx.commitFuture().isDone());
+
+        verifyNoInteractions(transactions);
+    }
+
+    @Test
+    public void testCommitImplicit() {
+        QueryTransactionWrapper wrapper = prepareImplicitTx();
+        NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap();
+
+        wrapper.commitImplicit();
+
+        assertThat(tx.commitFuture().isDone(), equalTo(true));
+        assertThat(tx.rollbackFuture().isDone(), equalTo(false));
+    }
+
+    @Test
+    public void testRollbackImplicit() {
+        QueryTransactionWrapper wrapper = prepareImplicitTx();
+        NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap();
+
+        wrapper.rollbackImplicit();
+
+        assertThat(tx.rollbackFuture().isDone(), equalTo(true));
+        assertThat(tx.commitFuture().isDone(), equalTo(false));
+    }
+
+    private QueryTransactionWrapper prepareImplicitTx() {
+        when(transactions.begin(any())).thenReturn(new NoOpTransaction("test"));
+
+        QueryTransactionWrapper wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, null);
+
+        assertThat(wrapper.unwrap(), instanceOf(NoOpTransaction.class));
+        NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap();
+
+        assertFalse(tx.rollbackFuture().isDone());
+        assertFalse(tx.commitFuture().isDone());
+
+        return wrapper;
+    }
+}
+
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 1fbf38976d..98aab74fb0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -76,7 +75,6 @@ import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteException;
@@ -87,6 +85,7 @@ import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -122,7 +121,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
     private MessagingService msgSrvc;
 
     @Mock
-    private TxManager txManager;
+    private IgniteTransactions transactions;
 
     @Mock
     private DistributionZoneManager distributionZoneManager;
@@ -235,7 +234,6 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
                 indexManager,
                 schemaManager,
                 dataStorageManager,
-                txManager,
                 distributionZoneManager,
                 Map::of,
                 mock(ReplicaService.class),
@@ -256,7 +254,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
         when(tbl.storage()).thenReturn(mock(MvTableStorage.class));
         when(tbl.storage().getTableDescriptor()).thenReturn(new StorageTableDescriptor(tblId, 1, "none"));
 
-        when(txManager.begin(anyBoolean(), any())).thenReturn(new NoOpTransaction(localNode.name()));
+        when(transactions.begin(any())).thenReturn(new NoOpTransaction(localNode.name()));
 
         qryProc.start();
 
@@ -268,6 +266,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
         var cursors = qryProc.querySingleAsync(
                 sessionId,
                 context,
+                transactions,
                 "SELECT * FROM TEST"
         );
 
@@ -293,6 +292,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
         assertTrue(assertThrows(IgniteInternalException.class, () -> qryProc.querySingleAsync(
                 sessionId,
                 context,
+                transactions,
                 "SELECT 1"
         )).getCause() instanceof NodeStoppingException);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index d698e996d7..e675871303 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -105,13 +105,13 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -158,6 +158,10 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     @Mock(lenient = true)
     private TxManager tm;
 
+    /** Ignite transactions. */
+    @Mock(lenient = true)
+    private IgniteTransactions transactions;
+
     /** Meta storage manager. */
     @Mock
     MetaStorageManager msm;
@@ -204,13 +208,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
     SqlQueryProcessor queryProc;
 
-    /** Test node. */
-    private final ClusterNode node = new ClusterNodeImpl(
-            UUID.randomUUID().toString(),
-            NODE_NAME,
-            new NetworkAddress("127.0.0.1", 2245)
-    );
-
     @InjectConfiguration
     private RocksDbStorageEngineConfiguration rocksDbEngineConfig;
 
@@ -326,7 +323,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 idxManager,
                 schemaManager,
                 dataStorageManager,
-                tm,
                 distributionZoneManager,
                 () -> dataStorageModules.collectSchemasFields(
                         List.of(
@@ -374,17 +370,12 @@ public class MockedStructuresTest extends IgniteAbstractTest {
      */
     @Test
     public void testCreateTable() {
-        SqlQueryProcessor finalQueryProc = queryProc;
-
-        SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
-        QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
         String curMethodName = getCurrentMethodName();
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
                 + "with primary_zone='%s'", curMethodName, ZONE_NAME);
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
+        readFirst(sql(newTblSql));
 
         assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
                 .equalsIgnoreCase(curMethodName)));
@@ -392,23 +383,23 @@ public class MockedStructuresTest extends IgniteAbstractTest {
         String finalNewTblSql1 = newTblSql;
 
         assertThrows(TableAlreadyExistsException.class,
-                () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql1)));
+                () -> readFirst(sql(finalNewTblSql1)));
 
         String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 int PRIMARY KEY, c2 varbinary(255)) "
                 + "with primary_zone='%s'", curMethodName, ZONE_NAME);
 
         assertThrows(TableAlreadyExistsException.class,
-                () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql2)));
+                () -> readFirst(sql(finalNewTblSql2)));
 
-        assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
+        assertThrows(SqlException.class, () -> readFirst(sql(
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions__wrong=1,primary_zone='"
                         + ZONE_NAME + "'")));
 
-        assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
+        assertThrows(SqlException.class, () -> readFirst(sql(
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas__wrong=1,primary_zone='"
                         + ZONE_NAME + "'")));
 
-        assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
+        assertThrows(SqlException.class, () -> readFirst(sql(
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone__wrong='"
                         + ZONE_NAME + "'")));
 
@@ -417,7 +408,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         String finalNewTblSql3 = newTblSql;
 
-        assertDoesNotThrow(() -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql3)));
+        assertDoesNotThrow(() -> readFirst(sql(finalNewTblSql3)));
     }
 
     /**
@@ -427,28 +418,25 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     public void testCreateTableWithDistributionZone() {
         String tableName = getCurrentMethodName().toUpperCase();
 
-        SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
-        QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) ",
                  tableName);
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
+        readFirst(sql(newTblSql));
 
         assertEquals(getZoneId(DEFAULT_ZONE_NAME), tblsCfg.tables().get(tableName).zoneId().value());
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName));
+        readFirst(sql("DROP TABLE " + tableName));
 
         int zoneId = dstZnsCfg.distributionZones().get(ZONE_NAME).zoneId().value();
 
         newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
                 + "with primary_zone='%s'", tableName, ZONE_NAME);
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
+        readFirst(sql(newTblSql));
 
         assertEquals(zoneId, tblsCfg.tables().get(tableName).zoneId().value());
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName));
+        readFirst(sql("DROP TABLE " + tableName));
 
         log.info("Creating a table with a non-existent distribution zone.");
 
@@ -458,8 +446,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         Throwable exception = assertThrows(
                 Throwable.class,
-                () -> readFirst(queryProc.querySingleAsync(sessionId, context,
-                        String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+                () -> readFirst(sql(String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
                                 + "with primary_zone='%s'", tableName, nonExistZone)))
         );
 
@@ -473,29 +460,21 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     public void testDropTable() {
         String curMethodName = getCurrentMethodName();
 
-        SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
-        QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName);
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
-
-        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + curMethodName));
+        readFirst(sql(newTblSql));
 
-        SqlQueryProcessor finalQueryProc = queryProc;
+        readFirst(sql("DROP TABLE " + curMethodName));
 
-        assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
-                "DROP TABLE " + curMethodName + "_not_exist")));
+        assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE " + curMethodName + "_not_exist")));
 
-        assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
-                "DROP TABLE " + curMethodName)));
+        assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE " + curMethodName)));
 
-        assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
-                "DROP TABLE PUBLIC." + curMethodName)));
+        assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE PUBLIC." + curMethodName)));
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
+        readFirst(sql("DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
 
-        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName));
+        readFirst(sql("DROP TABLE IF EXISTS PUBLIC." + curMethodName));
 
         assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -505,12 +484,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     void createTableWithTableOptions() {
         String method = getCurrentMethodName();
 
-        SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
-        QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
-        assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync(
-                sessionId,
-                context,
+        assertDoesNotThrow(() -> readFirst(sql(
                 String.format(
                         "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone='%s'",
                         method + 4,
@@ -520,9 +494,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         IgniteException exception = assertThrows(
                 IgniteException.class,
-                () -> readFirst(queryProc.querySingleAsync(
-                        sessionId,
-                        context,
+                () -> readFirst(sql(
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) WITH %s='%s'", method + 6, method, method)
                 ))
         );
@@ -584,6 +556,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
         InternalTransaction tx = mock(InternalTransaction.class);
         when(tx.startTimestamp()).thenReturn(HybridTimestamp.MAX_VALUE);
         when(tm.begin(anyBoolean(), any())).thenReturn(tx);
+        when(transactions.begin(any())).thenReturn(tx);
 
         when(replicaManager.stopReplica(any())).thenReturn(completedFuture(true));
 
@@ -630,4 +603,11 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     private int getZoneId(String zoneName) {
         return getZoneIdStrict(dstZnsCfg, zoneName);
     }
+
+    private CompletableFuture<AsyncSqlCursor<List<Object>>> sql(String query) {
+        SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+        return queryProc.querySingleAsync(sessionId, context, transactions, query);
+    }
 }
diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
index d8974861d8..a676e3cdc2 100644
--- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
+++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matcher;
@@ -474,7 +475,7 @@ public abstract class QueryChecker {
             if (!CollectionUtils.nullOrEmpty(planMatchers) || exactPlan != null) {
 
                 CompletableFuture<AsyncSqlCursor<List<Object>>> explainCursors = qryProc.querySingleAsync(sessionId,
-                        context, "EXPLAIN PLAN FOR " + qry, params);
+                        context, transactions(), "EXPLAIN PLAN FOR " + qry, params);
                 AsyncSqlCursor<List<Object>> explainCursor = await(explainCursors);
                 List<List<Object>> explainRes = getAllFromCursor(explainCursor);
 
@@ -491,7 +492,9 @@ public abstract class QueryChecker {
                 }
             }
             // Check result.
-            CompletableFuture<AsyncSqlCursor<List<Object>>> cursors = qryProc.querySingleAsync(sessionId, context, qry, params);
+            CompletableFuture<AsyncSqlCursor<List<Object>>> cursors =
+                    qryProc.querySingleAsync(sessionId, context, transactions(), qry, params);
+
             AsyncSqlCursor<List<Object>> cur = await(cursors);
 
             checkMetadata(cur);
@@ -553,6 +556,8 @@ public abstract class QueryChecker {
 
     protected abstract QueryProcessor getEngine();
 
+    protected abstract IgniteTransactions transactions();
+
     protected void checkMetadata(AsyncSqlCursor<?> cursor) {
 
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 4005572fcc..056ec46767 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -76,7 +76,7 @@ abstract class AbstractTableView {
             return (IgniteException) th;
         }
 
-        //TODO: IGNITE-14500 Replace with public exception with an error code (or unwrap?).
+        //TODO: IGNITE-20181 KV/Binary view public API should only throw public exceptions for the end user.
         return new IgniteException(th);
     }
 }