You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2022/08/28 17:36:02 UTC

[ignite-3] branch main updated: IGNITE-17434 Fix partially correct previous implementation of sql implicit transactions (#990)

This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 757d70edf2 IGNITE-17434 Fix partially correct previous implementation of sql implicit transactions (#990)
757d70edf2 is described below

commit 757d70edf23d758a8847412a461184505bd35d5a
Author: Evgeniy Stanilovskiy <st...@gmail.com>
AuthorDate: Sun Aug 28 20:35:57 2022 +0300

    IGNITE-17434 Fix partially correct previous implementation of sql implicit transactions (#990)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |  3 ++
 .../runner/app/jdbc/ItJdbcBatchSelfTest.java       |  2 +-
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 17 +++---
 .../internal/sql/api/ItSqlSynchronousApiTest.java  | 14 +++--
 .../sql/engine/AbstractBasicIntegrationTest.java   | 23 ++++++--
 .../ignite/internal/sql/engine/ItDmlTest.java      | 61 +++++++++++++++++++++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  1 +
 .../internal/sql/api/SessionBuilderImpl.java       |  2 +-
 .../internal/sql/engine/AsyncSqlCursorImpl.java    | 14 +++++
 .../internal/sql/engine/SqlQueryProcessor.java     | 20 ++++++-
 .../sql/engine/exec/ExecutionServiceImpl.java      | 10 ++--
 .../internal/sql/engine/exec/rel/AbstractNode.java |  2 +-
 .../internal/sql/engine/exec/rel/ModifyNode.java   | 21 ++++----
 .../sql/engine/property/PropertiesHolder.java      |  2 +-
 .../internal/sql/engine/util/BaseQueryContext.java | 19 ++++++-
 .../internal/sql/engine/StopCalciteModuleTest.java |  5 ++
 .../sql/engine/exec/MockedStructuresTest.java      |  3 +-
 .../sql/engine/session/SessionManagerTest.java     |  2 +-
 18 files changed, 175 insertions(+), 46 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 2a47869979..fcfb2c8e30 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -114,6 +114,9 @@ public class ErrorGroups {
 
         /** Cursor is already closed error. */
         public static final int CURSOR_CLOSED_ERR = SQL_ERR_GROUP.registerErrorCode(9);
+
+        /** Some keys can't be inserted because they violates unique constraint PK. */
+        public static final int DUPLICATE_KEYS_ERR = SQL_ERR_GROUP.registerErrorCode(10);
     }
 
     /** Meta storage error group. */
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
index 0e56acffaa..00719e7f4b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
@@ -311,7 +311,7 @@ public class ItJdbcBatchSelfTest extends AbstractJdbcSelfTest {
                 assertEquals(i + 1, updCnts[i], "Invalid update count: " + i);
             }
 
-            if (!e.getMessage().contains("Failed to INSERT some keys because they are already in cache")) {
+            if (!e.getMessage().contains("PK unique constraint is violated")) {
                 log.error("Invalid exception: ", e);
 
                 fail();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 6f00082206..2ef0fa7466 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.sql.api;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -38,6 +36,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
@@ -56,8 +55,8 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -595,13 +594,15 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
             }
         }
 
-        Throwable ex = assertThrowsWithCause(
-                () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join(),
-                SqlBatchException.class
+        CompletionException ex = assertThrows(
+                CompletionException.class,
+                () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join()
         );
-        assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
-        SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
 
+        assertInstanceOf(SqlBatchException.class, ex.getCause());
+        SqlBatchException batchEx = (SqlBatchException) ex.getCause();
+
+        assertEquals(Sql.DUPLICATE_KEYS_ERR, batchEx.code());
         assertEquals(err, batchEx.updateCounters().length);
         IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 584fe501fe..72c54b9a7d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -18,11 +18,10 @@
 package org.apache.ignite.internal.sql.api;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.common.collect.Streams;
@@ -42,8 +41,8 @@ import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -322,13 +321,12 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
             }
         }
 
-        Throwable ex = assertThrowsWithCause(
-                () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args),
-                SqlBatchException.class
+        SqlBatchException batchEx = assertThrows(
+                SqlBatchException.class,
+                () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args)
         );
-        assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
-        SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
 
+        assertEquals(Sql.DUPLICATE_KEYS_ERR, batchEx.code());
         assertEquals(err, batchEx.updateCounters().length);
         IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
index 2b29d7aac3..63657360ba 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
@@ -33,7 +33,9 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
@@ -41,6 +43,8 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -71,6 +75,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
 public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = Loggers.forClass(AbstractBasicIntegrationTest.class);
 
+    /** Timeout should be big enough to prevent premature session expiration. */
+    private static final long SESSION_IDLE_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
+
     /** Base port number. */
     private static final int BASE_PORT = 3344;
 
@@ -287,9 +294,19 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
     }
 
     protected static List<List<Object>> sql(String sql, Object... args) {
-        return getAllFromCursor(
-                await(((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine().queryAsync("PUBLIC", sql, args).get(0))
-        );
+        var queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+
+        SessionId sessionId = queryEngine.createSession(SESSION_IDLE_TIMEOUT, PropertiesHolder.fromMap(
+                Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+        ));
+
+        try {
+            return getAllFromCursor(
+                    await(queryEngine.querySingleAsync(sessionId, QueryContext.of(), sql, args))
+            );
+        } finally {
+            queryEngine.closeSession(sessionId);
+        }
     }
 
     protected static void checkMetadata(ColumnMetadata expectedMeta, ColumnMetadata actualMeta) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 422795774e..f4eb3ca460 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -20,19 +20,22 @@ package org.apache.ignite.internal.sql.engine;
 import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.SqlException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
@@ -86,6 +89,60 @@ public class ItDmlTest extends AbstractBasicIntegrationTest {
                 .check();
     }
 
+    @Test
+    public void batchWithConflictShouldBeRejectedEntirely() {
+        sql("CREATE TABLE test (id int primary key, val int)");
+
+        sql("INSERT INTO test values (1, 1)");
+
+        assertQuery("SELECT count(*) FROM test")
+                .returns(1L)
+                .check();
+
+        var sqlException = assertThrows(
+                SqlException.class,
+                () -> sql("INSERT INTO test VALUES (0, 0), (1, 1), (2, 2)")
+        );
+
+        assertEquals(Sql.DUPLICATE_KEYS_ERR, sqlException.code());
+
+        assertQuery("SELECT count(*) FROM test")
+                .returns(1L)
+                .check();
+    }
+
+    /**
+     * Test ensures that big insert although being split to several chunks will share the same implicit transaction.
+     */
+    @Test
+    public void bigBatchSpanTheSameTransaction() {
+        List<Integer> values = new ArrayList<>(AbstractNode.MODIFY_BATCH_SIZE * 2);
+
+        // need to generate batch big enough to be split on several chunks
+        for (int i = 0; i < AbstractNode.MODIFY_BATCH_SIZE * 1.5; i++) {
+            values.add(i);
+        }
+
+        values.add(values.get(0)); // add conflict entry from the first chunk
+
+        sql("CREATE TABLE test (id int primary key, val int default 1)");
+
+        String insertStatement = "INSERT INTO test (id) VALUES " + values.stream()
+                .map(Object::toString)
+                .collect(Collectors.joining("), (", "(", ")"));
+
+        SqlException sqlException = assertThrows(
+                SqlException.class,
+                () -> sql(insertStatement)
+        );
+
+        assertEquals(Sql.DUPLICATE_KEYS_ERR, sqlException.code());
+
+        assertQuery("SELECT count(*) FROM test")
+                .returns(0L)
+                .check();
+    }
+
     /**Test full MERGE command. */
     @Test
     public void testMerge() {
@@ -290,7 +347,7 @@ public class ItDmlTest extends AbstractBasicIntegrationTest {
                                 + "WHEN MATCHED THEN UPDATE SET b = test1.b + 1 "
                                 + "WHEN NOT MATCHED THEN INSERT (k, a, b) VALUES (0, a, b)"));
 
-        assertTrue(ex.getCause().getMessage().contains("Failed to MERGE some keys due to keys conflict"));
+        assertEquals(Sql.DUPLICATE_KEYS_ERR, ex.code());
     }
 
     @Test
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 08659a45f4..7c805fd7ea 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -359,6 +359,7 @@ public class IgniteImpl implements Ignite {
                 indexManager,
                 schemaManager,
                 dataStorageMgr,
+                txManager,
                 () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions())
         );
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
index ca8a894370..9afb44321c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -135,7 +135,7 @@ public class SessionBuilderImpl implements SessionBuilder {
     /** {@inheritDoc} */
     @Override
     public Session build() {
-        var propsHolder = PropertiesHolder.holderFor(
+        var propsHolder = PropertiesHolder.fromMap(
                 Map.of(
                         QueryProperty.QUERY_TIMEOUT, queryTimeout,
                         QueryProperty.DEFAULT_SCHEMA, schema
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index c8166664df..a8498a53d0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.sql.engine;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Sql query cursor.
@@ -30,6 +32,7 @@ import org.apache.ignite.sql.ResultSetMetadata;
 public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     private final SqlQueryType queryType;
     private final ResultSetMetadata meta;
+    private final @Nullable InternalTransaction implicitTx;
     private final AsyncCursor<T> dataCursor;
 
     /**
@@ -42,10 +45,12 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     public AsyncSqlCursorImpl(
             SqlQueryType queryType,
             ResultSetMetadata meta,
+            @Nullable InternalTransaction implicitTx,
             AsyncCursor<T> dataCursor
     ) {
         this.queryType = queryType;
         this.meta = meta;
+        this.implicitTx = implicitTx;
         this.dataCursor = dataCursor;
     }
 
@@ -66,9 +71,18 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
     public CompletionStage<BatchedResult<T>> requestNextAsync(int rows) {
         return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
             if (t != null) {
+                if (implicitTx != null) {
+                    implicitTx.rollback();
+                }
+
                 throw IgniteException.wrap(t);
             }
 
+            if (implicitTx != null && !batch.hasMore()) {
+                // last batch, need to commit transaction
+                implicitTx.commit();
+            }
+
             return batch;
         });
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index e8aa8f3152..c65b1e56fa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -125,6 +127,9 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private volatile SqlSchemaManager sqlSchemaManager;
 
+    /** Transaction manager. */
+    private final TxManager txManager;
+
     /** Constructor. */
     public SqlQueryProcessor(
             Consumer<Function<Long, CompletableFuture<?>>> registry,
@@ -133,6 +138,7 @@ public class SqlQueryProcessor implements QueryProcessor {
             IndexManager indexManager,
             SchemaManager schemaManager,
             DataStorageManager dataStorageManager,
+            TxManager txManager,
             Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier
     ) {
         this.registry = registry;
@@ -141,6 +147,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.indexManager = indexManager;
         this.schemaManager = schemaManager;
         this.dataStorageManager = dataStorageManager;
+        this.txManager = txManager;
         this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
     }
 
@@ -335,7 +342,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         InternalTransaction outerTx = context.unwrap(InternalTransaction.class);
 
-        final BaseQueryContext ctx = BaseQueryContext.builder()
+        BaseQueryContext ctx = BaseQueryContext.builder()
                 .cancel(new QueryCancel())
                 .frameworkConfig(
                         Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
@@ -378,11 +385,19 @@ public class SqlQueryProcessor implements QueryProcessor {
                     context.maybeUnwrap(QueryValidator.class)
                             .ifPresent(queryValidator -> queryValidator.validatePlan(plan));
 
-                    var dataCursor = executionSrvc.executePlan(plan, ctx);
+                    // Transactional DDL is not supported as well as RO transactions, hence
+                    // only DML requiring RW transaction is covered
+                    boolean implicitTxRequired = plan.type() == Type.DML && outerTx == null;
+                    InternalTransaction implicitTx = implicitTxRequired ? txManager.begin() : null;
+
+                    BaseQueryContext enrichedContext = implicitTxRequired ? ctx.toBuilder().transaction(implicitTx).build() : ctx;
+
+                    var dataCursor = executionSrvc.executePlan(plan, enrichedContext);
 
                     return new AsyncSqlCursorImpl<>(
                             SqlQueryType.mapPlanTypeToSqlType(plan.type()),
                             plan.metadata(),
+                            implicitTx,
                             new AsyncCursor<List<Object>>() {
                                 @Override
                                 public CompletionStage<BatchedResult<List<Object>>> requestNextAsync(int rows) {
@@ -450,6 +465,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                         return new AsyncSqlCursorImpl<>(
                                 SqlQueryType.mapPlanTypeToSqlType(plan.type()),
                                 plan.metadata(),
+                                null,
                                 executionSrvc.executePlan(plan, ctx)
                         );
                     });
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c3109c1862..189e0949c0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -131,7 +131,6 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
             MailboxRegistry mailboxRegistry,
             ExchangeService exchangeSrvc,
             DataStorageManager dataStorageManager
-
     ) {
         return new ExecutionServiceImpl<>(
                 topSrvc.localMember().id(),
@@ -149,7 +148,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
     /**
      * Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public ExecutionServiceImpl(
+    ExecutionServiceImpl(
             String localNodeId,
             MessageService msgSrvc,
             MappingService mappingSrvc,
@@ -360,9 +359,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
         private volatile Long rootFragmentId = null;
 
-        private InternalTransaction transaction;
+        private @Nullable InternalTransaction transaction;
 
-        private DistributedQueryManager(BaseQueryContext ctx, InternalTransaction transaction) {
+        private DistributedQueryManager(
+                BaseQueryContext ctx,
+                @Nullable InternalTransaction transaction
+        ) {
             this(ctx);
 
             this.transaction = transaction;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index ca332d371d..601e6cf81d 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -34,7 +34,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
  * Abstract node of execution tree.
  */
 public abstract class AbstractNode<RowT> implements Node<RowT> {
-    protected static final int MODIFY_BATCH_SIZE = 100; //IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
+    public static final int MODIFY_BATCH_SIZE = 100; //IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
 
     protected static final int IO_BATCH_SIZE = 256; //IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256);
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index 47f1ab12e8..d6ad247ff4 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
@@ -36,14 +38,16 @@ import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.ErrorGroups;
+import org.apache.ignite.sql.SqlException;
 
 /**
  * ModifyNode.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<RowT>, Downstream<RowT> {
+    private static final IgniteLogger LOG = Loggers.forClass(ModifyNode.class);
+
     private final InternalIgniteTable table;
 
     private final TableModify.Operation modifyOp;
@@ -255,15 +259,10 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
     }
 
     /** Transforms keys list to appropriate exception. */
-    private IgniteInternalException conflictKeysException(List<String> conflictKeys) {
-        if (modifyOp == TableModify.Operation.INSERT) {
-            return new IgniteInternalException("Failed to INSERT some keys because they are already in cache. "
-                    + "[rows=" + conflictKeys + ']');
-        } else {
-            return new IgniteInternalException(
-                    IgniteStringFormatter.format("Failed to MERGE some keys due to keys conflict or concurrent updates, "
-                            + "clashed input rows: {}", conflictKeys));
-        }
+    private RuntimeException conflictKeysException(List<String> conflictKeys) {
+        LOG.debug("Unable to update some keys because of conflict [op={}, keys={}]", modifyOp, conflictKeys);
+
+        return new SqlException(ErrorGroups.Sql.DUPLICATE_KEYS_ERR, "PK unique constraint is violated");
     }
 
     private enum State {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java
index 72b9206bdd..5f0fd7572f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java
@@ -66,7 +66,7 @@ public interface PropertiesHolder {
      * @param values Values to hold.
      * @return Holder.
      */
-    static PropertiesHolder holderFor(Map<Property<?>, Object> values) {
+    static PropertiesHolder fromMap(Map<Property<?>, Object> values) {
         for (Map.Entry<Property<?>, Object> e : values.entrySet()) {
             if (e.getValue() == null) {
                 continue;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 429e38b2ec..f99f5d35e7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -159,9 +159,9 @@ public final class BaseQueryContext extends AbstractQueryContext {
 
     private final Object[] parameters;
 
-    private CalciteCatalogReader catalogReader;
+    private final InternalTransaction tx;
 
-    private InternalTransaction tx;
+    private CalciteCatalogReader catalogReader;
 
     /**
      * Private constructor, used by a builder.
@@ -261,6 +261,21 @@ public final class BaseQueryContext extends AbstractQueryContext {
         return cancel;
     }
 
+    /**
+     * Creates a builder object filled with current context attributes.
+     *
+     * @return Prefilled builder.
+     */
+    public Builder toBuilder() {
+        return builder()
+                .queryId(queryId)
+                .frameworkConfig(cfg)
+                .logger(log)
+                .cancel(cancel)
+                .parameters(parameters)
+                .transaction(tx);
+    }
+
     /**
      * Query context builder.
      */
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 8be8f95c15..1782a52bcf 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -122,6 +123,9 @@ public class StopCalciteModuleTest {
     @Mock
     MessagingService msgSrvc;
 
+    @Mock
+    TxManager txManager;
+
     @Mock
     TopologyService topologySrvc;
 
@@ -224,6 +228,7 @@ public class StopCalciteModuleTest {
                 indexManager,
                 schemaManager,
                 dataStorageManager,
+                txManager,
                 Map::of
         );
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index f2747c9e23..ef840a005c 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -277,6 +277,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 idxManager,
                 schemaManager,
                 dataStorageManager,
+                tm,
                 () -> dataStorageModules.collectSchemasFields(List.of(
                         RocksDbDataStorageConfigurationSchema.class,
                         TestConcurrentHashMapDataStorageConfigurationSchema.class
@@ -297,7 +298,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
      */
     @Test
     public void testInnerTxInitiated() throws Exception {
-        SessionId sesId = queryProc.createSession(1000, PropertiesHolder.holderFor(Map.of()));
+        SessionId sesId = queryProc.createSession(1000, PropertiesHolder.fromMap(Map.of()));
 
         InternalTransaction tx = mock(InternalTransaction.class);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
index 992dbc670d..a8129c63cc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
@@ -55,7 +55,7 @@ class SessionManagerTest {
 
     @Test
     void sessionGet() {
-        PropertiesHolder propHldr = PropertiesHolder.holderFor(Map.of());
+        PropertiesHolder propHldr = PropertiesHolder.fromMap(Map.of());
 
         SessionId sessionId = sessionMgr.createSession(12345, propHldr);