You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2022/08/28 17:36:02 UTC
[ignite-3] branch main updated: IGNITE-17434 Fix partially correct previous implementation of sql implicit transactions (#990)
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 757d70edf2 IGNITE-17434 Fix partially correct previous implementation of sql implicit transactions (#990)
757d70edf2 is described below
commit 757d70edf23d758a8847412a461184505bd35d5a
Author: Evgeniy Stanilovskiy <st...@gmail.com>
AuthorDate: Sun Aug 28 20:35:57 2022 +0300
IGNITE-17434 Fix partially correct previous implementation of sql implicit transactions (#990)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 ++
.../runner/app/jdbc/ItJdbcBatchSelfTest.java | 2 +-
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 17 +++---
.../internal/sql/api/ItSqlSynchronousApiTest.java | 14 +++--
.../sql/engine/AbstractBasicIntegrationTest.java | 23 ++++++--
.../ignite/internal/sql/engine/ItDmlTest.java | 61 +++++++++++++++++++++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../internal/sql/api/SessionBuilderImpl.java | 2 +-
.../internal/sql/engine/AsyncSqlCursorImpl.java | 14 +++++
.../internal/sql/engine/SqlQueryProcessor.java | 20 ++++++-
.../sql/engine/exec/ExecutionServiceImpl.java | 10 ++--
.../internal/sql/engine/exec/rel/AbstractNode.java | 2 +-
.../internal/sql/engine/exec/rel/ModifyNode.java | 21 ++++----
.../sql/engine/property/PropertiesHolder.java | 2 +-
.../internal/sql/engine/util/BaseQueryContext.java | 19 ++++++-
.../internal/sql/engine/StopCalciteModuleTest.java | 5 ++
.../sql/engine/exec/MockedStructuresTest.java | 3 +-
.../sql/engine/session/SessionManagerTest.java | 2 +-
18 files changed, 175 insertions(+), 46 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 2a47869979..fcfb2c8e30 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -114,6 +114,9 @@ public class ErrorGroups {
/** Cursor is already closed error. */
public static final int CURSOR_CLOSED_ERR = SQL_ERR_GROUP.registerErrorCode(9);
+
+ /** Some keys can't be inserted because they violates unique constraint PK. */
+ public static final int DUPLICATE_KEYS_ERR = SQL_ERR_GROUP.registerErrorCode(10);
}
/** Meta storage error group. */
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
index 0e56acffaa..00719e7f4b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcBatchSelfTest.java
@@ -311,7 +311,7 @@ public class ItJdbcBatchSelfTest extends AbstractJdbcSelfTest {
assertEquals(i + 1, updCnts[i], "Invalid update count: " + i);
}
- if (!e.getMessage().contains("Failed to INSERT some keys because they are already in cache")) {
+ if (!e.getMessage().contains("PK unique constraint is violated")) {
log.error("Invalid exception: ", e);
fail();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 6f00082206..2ef0fa7466 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.sql.api;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -38,6 +36,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
@@ -56,8 +55,8 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
@@ -595,13 +594,15 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
}
}
- Throwable ex = assertThrowsWithCause(
- () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join(),
- SqlBatchException.class
+ CompletionException ex = assertThrows(
+ CompletionException.class,
+ () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join()
);
- assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
- SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
+ assertInstanceOf(SqlBatchException.class, ex.getCause());
+ SqlBatchException batchEx = (SqlBatchException) ex.getCause();
+
+ assertEquals(Sql.DUPLICATE_KEYS_ERR, batchEx.code());
assertEquals(err, batchEx.updateCounters().length);
IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 584fe501fe..72c54b9a7d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -18,11 +18,10 @@
package org.apache.ignite.internal.sql.api;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Streams;
@@ -42,8 +41,8 @@ import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
@@ -322,13 +321,12 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
}
}
- Throwable ex = assertThrowsWithCause(
- () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args),
- SqlBatchException.class
+ SqlBatchException batchEx = assertThrows(
+ SqlBatchException.class,
+ () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args)
);
- assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache"));
- SqlBatchException batchEx = cause(ex, SqlBatchException.class, null);
+ assertEquals(Sql.DUPLICATE_KEYS_ERR, batchEx.code());
assertEquals(err, batchEx.updateCounters().length);
IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i]));
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
index 2b29d7aac3..63657360ba 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
@@ -33,7 +33,9 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
@@ -41,6 +43,8 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -71,6 +75,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG = Loggers.forClass(AbstractBasicIntegrationTest.class);
+ /** Timeout should be big enough to prevent premature session expiration. */
+ private static final long SESSION_IDLE_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
+
/** Base port number. */
private static final int BASE_PORT = 3344;
@@ -287,9 +294,19 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
}
protected static List<List<Object>> sql(String sql, Object... args) {
- return getAllFromCursor(
- await(((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine().queryAsync("PUBLIC", sql, args).get(0))
- );
+ var queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+
+ SessionId sessionId = queryEngine.createSession(SESSION_IDLE_TIMEOUT, PropertiesHolder.fromMap(
+ Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+ ));
+
+ try {
+ return getAllFromCursor(
+ await(queryEngine.querySingleAsync(sessionId, QueryContext.of(), sql, args))
+ );
+ } finally {
+ queryEngine.closeSession(sessionId);
+ }
}
protected static void checkMetadata(ColumnMetadata expectedMeta, ColumnMetadata actualMeta) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 422795774e..f4eb3ca460 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -20,19 +20,22 @@ package org.apache.ignite.internal.sql.engine;
import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.SqlException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
@@ -86,6 +89,60 @@ public class ItDmlTest extends AbstractBasicIntegrationTest {
.check();
}
+ @Test
+ public void batchWithConflictShouldBeRejectedEntirely() {
+ sql("CREATE TABLE test (id int primary key, val int)");
+
+ sql("INSERT INTO test values (1, 1)");
+
+ assertQuery("SELECT count(*) FROM test")
+ .returns(1L)
+ .check();
+
+ var sqlException = assertThrows(
+ SqlException.class,
+ () -> sql("INSERT INTO test VALUES (0, 0), (1, 1), (2, 2)")
+ );
+
+ assertEquals(Sql.DUPLICATE_KEYS_ERR, sqlException.code());
+
+ assertQuery("SELECT count(*) FROM test")
+ .returns(1L)
+ .check();
+ }
+
+ /**
+ * Test ensures that big insert although being split to several chunks will share the same implicit transaction.
+ */
+ @Test
+ public void bigBatchSpanTheSameTransaction() {
+ List<Integer> values = new ArrayList<>(AbstractNode.MODIFY_BATCH_SIZE * 2);
+
+ // need to generate batch big enough to be split on several chunks
+ for (int i = 0; i < AbstractNode.MODIFY_BATCH_SIZE * 1.5; i++) {
+ values.add(i);
+ }
+
+ values.add(values.get(0)); // add conflict entry from the first chunk
+
+ sql("CREATE TABLE test (id int primary key, val int default 1)");
+
+ String insertStatement = "INSERT INTO test (id) VALUES " + values.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("), (", "(", ")"));
+
+ SqlException sqlException = assertThrows(
+ SqlException.class,
+ () -> sql(insertStatement)
+ );
+
+ assertEquals(Sql.DUPLICATE_KEYS_ERR, sqlException.code());
+
+ assertQuery("SELECT count(*) FROM test")
+ .returns(0L)
+ .check();
+ }
+
/**Test full MERGE command. */
@Test
public void testMerge() {
@@ -290,7 +347,7 @@ public class ItDmlTest extends AbstractBasicIntegrationTest {
+ "WHEN MATCHED THEN UPDATE SET b = test1.b + 1 "
+ "WHEN NOT MATCHED THEN INSERT (k, a, b) VALUES (0, a, b)"));
- assertTrue(ex.getCause().getMessage().contains("Failed to MERGE some keys due to keys conflict"));
+ assertEquals(Sql.DUPLICATE_KEYS_ERR, ex.code());
}
@Test
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 08659a45f4..7c805fd7ea 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -359,6 +359,7 @@ public class IgniteImpl implements Ignite {
indexManager,
schemaManager,
dataStorageMgr,
+ txManager,
() -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions())
);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
index ca8a894370..9afb44321c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -135,7 +135,7 @@ public class SessionBuilderImpl implements SessionBuilder {
/** {@inheritDoc} */
@Override
public Session build() {
- var propsHolder = PropertiesHolder.holderFor(
+ var propsHolder = PropertiesHolder.fromMap(
Map.of(
QueryProperty.QUERY_TIMEOUT, queryTimeout,
QueryProperty.DEFAULT_SCHEMA, schema
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index c8166664df..a8498a53d0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.sql.engine;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
/**
* Sql query cursor.
@@ -30,6 +32,7 @@ import org.apache.ignite.sql.ResultSetMetadata;
public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
private final SqlQueryType queryType;
private final ResultSetMetadata meta;
+ private final @Nullable InternalTransaction implicitTx;
private final AsyncCursor<T> dataCursor;
/**
@@ -42,10 +45,12 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
public AsyncSqlCursorImpl(
SqlQueryType queryType,
ResultSetMetadata meta,
+ @Nullable InternalTransaction implicitTx,
AsyncCursor<T> dataCursor
) {
this.queryType = queryType;
this.meta = meta;
+ this.implicitTx = implicitTx;
this.dataCursor = dataCursor;
}
@@ -66,9 +71,18 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
public CompletionStage<BatchedResult<T>> requestNextAsync(int rows) {
return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
if (t != null) {
+ if (implicitTx != null) {
+ implicitTx.rollback();
+ }
+
throw IgniteException.wrap(t);
}
+ if (implicitTx != null && !batch.hasMore()) {
+ // last batch, need to commit transaction
+ implicitTx.commit();
+ }
+
return batch;
});
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index e8aa8f3152..c65b1e56fa 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
@@ -125,6 +127,9 @@ public class SqlQueryProcessor implements QueryProcessor {
private volatile SqlSchemaManager sqlSchemaManager;
+ /** Transaction manager. */
+ private final TxManager txManager;
+
/** Constructor. */
public SqlQueryProcessor(
Consumer<Function<Long, CompletableFuture<?>>> registry,
@@ -133,6 +138,7 @@ public class SqlQueryProcessor implements QueryProcessor {
IndexManager indexManager,
SchemaManager schemaManager,
DataStorageManager dataStorageManager,
+ TxManager txManager,
Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier
) {
this.registry = registry;
@@ -141,6 +147,7 @@ public class SqlQueryProcessor implements QueryProcessor {
this.indexManager = indexManager;
this.schemaManager = schemaManager;
this.dataStorageManager = dataStorageManager;
+ this.txManager = txManager;
this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
}
@@ -335,7 +342,7 @@ public class SqlQueryProcessor implements QueryProcessor {
InternalTransaction outerTx = context.unwrap(InternalTransaction.class);
- final BaseQueryContext ctx = BaseQueryContext.builder()
+ BaseQueryContext ctx = BaseQueryContext.builder()
.cancel(new QueryCancel())
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
@@ -378,11 +385,19 @@ public class SqlQueryProcessor implements QueryProcessor {
context.maybeUnwrap(QueryValidator.class)
.ifPresent(queryValidator -> queryValidator.validatePlan(plan));
- var dataCursor = executionSrvc.executePlan(plan, ctx);
+ // Transactional DDL is not supported as well as RO transactions, hence
+ // only DML requiring RW transaction is covered
+ boolean implicitTxRequired = plan.type() == Type.DML && outerTx == null;
+ InternalTransaction implicitTx = implicitTxRequired ? txManager.begin() : null;
+
+ BaseQueryContext enrichedContext = implicitTxRequired ? ctx.toBuilder().transaction(implicitTx).build() : ctx;
+
+ var dataCursor = executionSrvc.executePlan(plan, enrichedContext);
return new AsyncSqlCursorImpl<>(
SqlQueryType.mapPlanTypeToSqlType(plan.type()),
plan.metadata(),
+ implicitTx,
new AsyncCursor<List<Object>>() {
@Override
public CompletionStage<BatchedResult<List<Object>>> requestNextAsync(int rows) {
@@ -450,6 +465,7 @@ public class SqlQueryProcessor implements QueryProcessor {
return new AsyncSqlCursorImpl<>(
SqlQueryType.mapPlanTypeToSqlType(plan.type()),
plan.metadata(),
+ null,
executionSrvc.executePlan(plan, ctx)
);
});
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c3109c1862..189e0949c0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -131,7 +131,6 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
MailboxRegistry mailboxRegistry,
ExchangeService exchangeSrvc,
DataStorageManager dataStorageManager
-
) {
return new ExecutionServiceImpl<>(
topSrvc.localMember().id(),
@@ -149,7 +148,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
/**
* Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public ExecutionServiceImpl(
+ ExecutionServiceImpl(
String localNodeId,
MessageService msgSrvc,
MappingService mappingSrvc,
@@ -360,9 +359,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
private volatile Long rootFragmentId = null;
- private InternalTransaction transaction;
+ private @Nullable InternalTransaction transaction;
- private DistributedQueryManager(BaseQueryContext ctx, InternalTransaction transaction) {
+ private DistributedQueryManager(
+ BaseQueryContext ctx,
+ @Nullable InternalTransaction transaction
+ ) {
this(ctx);
this.transaction = transaction;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index ca332d371d..601e6cf81d 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -34,7 +34,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
* Abstract node of execution tree.
*/
public abstract class AbstractNode<RowT> implements Node<RowT> {
- protected static final int MODIFY_BATCH_SIZE = 100; //IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
+ public static final int MODIFY_BATCH_SIZE = 100; //IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
protected static final int IO_BATCH_SIZE = 256; //IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index 47f1ab12e8..d6ad247ff4 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
@@ -36,14 +38,16 @@ import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.ErrorGroups;
+import org.apache.ignite.sql.SqlException;
/**
* ModifyNode.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<RowT>, Downstream<RowT> {
+ private static final IgniteLogger LOG = Loggers.forClass(ModifyNode.class);
+
private final InternalIgniteTable table;
private final TableModify.Operation modifyOp;
@@ -255,15 +259,10 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R
}
/** Transforms keys list to appropriate exception. */
- private IgniteInternalException conflictKeysException(List<String> conflictKeys) {
- if (modifyOp == TableModify.Operation.INSERT) {
- return new IgniteInternalException("Failed to INSERT some keys because they are already in cache. "
- + "[rows=" + conflictKeys + ']');
- } else {
- return new IgniteInternalException(
- IgniteStringFormatter.format("Failed to MERGE some keys due to keys conflict or concurrent updates, "
- + "clashed input rows: {}", conflictKeys));
- }
+ private RuntimeException conflictKeysException(List<String> conflictKeys) {
+ LOG.debug("Unable to update some keys because of conflict [op={}, keys={}]", modifyOp, conflictKeys);
+
+ return new SqlException(ErrorGroups.Sql.DUPLICATE_KEYS_ERR, "PK unique constraint is violated");
}
private enum State {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java
index 72b9206bdd..5f0fd7572f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/property/PropertiesHolder.java
@@ -66,7 +66,7 @@ public interface PropertiesHolder {
* @param values Values to hold.
* @return Holder.
*/
- static PropertiesHolder holderFor(Map<Property<?>, Object> values) {
+ static PropertiesHolder fromMap(Map<Property<?>, Object> values) {
for (Map.Entry<Property<?>, Object> e : values.entrySet()) {
if (e.getValue() == null) {
continue;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 429e38b2ec..f99f5d35e7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -159,9 +159,9 @@ public final class BaseQueryContext extends AbstractQueryContext {
private final Object[] parameters;
- private CalciteCatalogReader catalogReader;
+ private final InternalTransaction tx;
- private InternalTransaction tx;
+ private CalciteCatalogReader catalogReader;
/**
* Private constructor, used by a builder.
@@ -261,6 +261,21 @@ public final class BaseQueryContext extends AbstractQueryContext {
return cancel;
}
+ /**
+ * Creates a builder object filled with current context attributes.
+ *
+ * @return Prefilled builder.
+ */
+ public Builder toBuilder() {
+ return builder()
+ .queryId(queryId)
+ .frameworkConfig(cfg)
+ .logger(log)
+ .cancel(cancel)
+ .parameters(parameters)
+ .transaction(tx);
+ }
+
/**
* Query context builder.
*/
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 8be8f95c15..1782a52bcf 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
@@ -122,6 +123,9 @@ public class StopCalciteModuleTest {
@Mock
MessagingService msgSrvc;
+ @Mock
+ TxManager txManager;
+
@Mock
TopologyService topologySrvc;
@@ -224,6 +228,7 @@ public class StopCalciteModuleTest {
indexManager,
schemaManager,
dataStorageManager,
+ txManager,
Map::of
);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index f2747c9e23..ef840a005c 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -277,6 +277,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
idxManager,
schemaManager,
dataStorageManager,
+ tm,
() -> dataStorageModules.collectSchemasFields(List.of(
RocksDbDataStorageConfigurationSchema.class,
TestConcurrentHashMapDataStorageConfigurationSchema.class
@@ -297,7 +298,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
*/
@Test
public void testInnerTxInitiated() throws Exception {
- SessionId sesId = queryProc.createSession(1000, PropertiesHolder.holderFor(Map.of()));
+ SessionId sesId = queryProc.createSession(1000, PropertiesHolder.fromMap(Map.of()));
InternalTransaction tx = mock(InternalTransaction.class);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
index 992dbc670d..a8129c63cc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
@@ -55,7 +55,7 @@ class SessionManagerTest {
@Test
void sessionGet() {
- PropertiesHolder propHldr = PropertiesHolder.holderFor(Map.of());
+ PropertiesHolder propHldr = PropertiesHolder.fromMap(Map.of());
SessionId sessionId = sessionMgr.createSession(12345, propHldr);