You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2023/08/25 13:55:54 UTC
[ignite-3] branch main updated: IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4fd6b984d5 IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449)
4fd6b984d5 is described below
commit 4fd6b984d5a46ddb2bd6def55365a6b349feb720
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Fri Aug 25 16:55:48 2023 +0300
IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449)
---
.../internal/cli/CliIntegrationTestBase.java | 7 +-
.../client/handler/JdbcQueryEventHandlerImpl.java | 2 +
.../requests/sql/ClientSqlExecuteRequest.java | 4 +-
.../handler/JdbcQueryEventHandlerImplTest.java | 8 +-
.../requests/jdbc/JdbcQueryCursorSelfTest.java | 15 +-
.../client/fakes/FakeIgniteQueryProcessor.java | 3 +-
.../internal/ClusterPerTestIntegrationTest.java | 2 +-
.../benchmark/AbstractOneNodeBenchmark.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 1 -
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 48 ++--
.../internal/sql/api/ItSqlSynchronousApiTest.java | 22 +-
.../sql/engine/ClusterPerClassIntegrationTest.java | 7 +-
.../engine/datatypes/tests/BaseDataTypeTest.java | 6 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../sql/engine/util/TestQueryProcessor.java | 13 +-
.../ignite/internal/sql/api/IgniteSqlImpl.java | 9 +-
.../internal/sql/api/SessionBuilderImpl.java | 8 +-
.../ignite/internal/sql/api/SessionImpl.java | 12 +-
.../internal/sql/engine/AsyncSqlCursorImpl.java | 21 +-
.../ignite/internal/sql/engine/QueryProcessor.java | 6 +-
.../sql/engine/QueryTransactionWrapper.java | 59 +++++
.../internal/sql/engine/SqlQueryProcessor.java | 252 +++++++++++----------
.../sql/engine/AsyncSqlCursorImplTest.java | 51 ++---
.../engine/QueryTransactionWrapperSelfTest.java | 131 +++++++++++
.../internal/sql/engine/StopCalciteModuleTest.java | 10 +-
.../sql/engine/exec/MockedStructuresTest.java | 88 +++----
.../internal/sql/engine/util/QueryChecker.java | 9 +-
.../ignite/internal/table/AbstractTableView.java | 2 +-
28 files changed, 522 insertions(+), 279 deletions(-)
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
index becb71c8cd..b06ea4bc9d 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
@@ -29,11 +29,13 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.testframework.IntegrationTestBase;
import org.apache.ignite.table.Table;
+import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInstance;
@@ -88,7 +90,8 @@ public abstract class CliIntegrationTestBase extends IntegrationTestBase {
}
protected static List<List<Object>> sql(@Nullable Transaction tx, String sql, Object... args) {
- var queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+ QueryProcessor queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
+ IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
SessionId sessionId = queryEngine.createSession(PropertiesHelper.emptyHolder());
@@ -96,7 +99,7 @@ public abstract class CliIntegrationTestBase extends IntegrationTestBase {
var context = QueryContext.create(SqlQueryType.ALL, tx);
return getAllFromCursor(
- await(queryEngine.querySingleAsync(sessionId, context, sql, args))
+ await(queryEngine.querySingleAsync(sessionId, context, transactions, sql, args))
);
} finally {
queryEngine.closeSession(sessionId);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index c04486f5ec..ea632c4f4d 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -165,6 +165,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
CompletableFuture<AsyncSqlCursor<List<Object>>> result = connectionContext.doInSession(sessionId -> processor.querySingleAsync(
sessionId,
context,
+ igniteTransactions,
req.sqlQuery(),
req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
));
@@ -271,6 +272,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler {
CompletableFuture<AsyncSqlCursor<List<Object>>> result = connCtx.doInSession(sessionId -> processor.querySingleAsync(
sessionId,
queryContext,
+ igniteTransactions,
sql,
arg == null ? OBJECT_EMPTY_ARRAY : arg
));
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index ad8675dd08..616c949a7d 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -79,7 +79,7 @@ public class ClientSqlExecuteRequest {
arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
}
- // TODO IGNITE-19898 SQL implicit RO transaction should use observation timestamp.
+ // TODO IGNITE-20232 Propagate observable timestamp to sql engine using internal API.
HybridTimestamp unused = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
return session
@@ -87,7 +87,7 @@ public class ClientSqlExecuteRequest {
.thenCompose(asyncResultSet -> {
//noinspection StatementWithEmptyBody
if (tx == null) {
- // TODO IGNITE-19898 Return readTimestamp from implicit RO TX to the client
+ // TODO IGNITE-20232 Propagate observable timestamp to sql engine using internal API.
// out.meta(asyncResultSet.tx().readTimestamp());
}
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
index 89823b4cb9..6477c9b8bc 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
@@ -175,7 +175,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
when(queryProcessor.createSession(any())).thenReturn(expectedSessionId);
- when(queryProcessor.querySingleAsync(eq(expectedSessionId), any(), any()))
+ when(queryProcessor.querySingleAsync(eq(expectedSessionId), any(), any(), any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("This is fine")));
long connectionId = acquireConnectionId();
@@ -185,7 +185,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
)));
verify(queryProcessor).createSession(any());
- verify(queryProcessor).querySingleAsync(eq(expectedSessionId), any(), any(), any(Object[].class));
+ verify(queryProcessor).querySingleAsync(eq(expectedSessionId), any(), any(), any(), any(Object[].class));
verifyNoMoreInteractions(queryProcessor);
}
@@ -240,7 +240,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
@Test
public void singleTxUsedForMultipleOperations() {
- when(queryProcessor.querySingleAsync(any(), any(), any()))
+ when(queryProcessor.querySingleAsync(any(), any(), any(), any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected")));
Transaction tx = mock(Transaction.class);
@@ -273,7 +273,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest {
verify(tx).commitAsync();
verifyNoMoreInteractions(igniteTransactions);
- verify(queryProcessor, times(5)).querySingleAsync(any(), any(), any(), any(Object[].class));
+ verify(queryProcessor, times(5)).querySingleAsync(any(), any(), any(), any(), any(Object[].class));
}
@Test
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
index 3d1ae15c63..29bc7db0f4 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
@@ -24,17 +24,26 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
+import org.apache.ignite.internal.sql.engine.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.AsyncWrapper;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
/**
* Test class for {@link JdbcQueryCursor}.
*/
-public class JdbcQueryCursorSelfTest {
+@ExtendWith(MockitoExtension.class)
+public class JdbcQueryCursorSelfTest extends BaseIgniteAbstractTest {
+ @Mock
+ private QueryTransactionWrapper txWrapper;
+
private static final List<Integer> ROWS = List.of(1, 2, 3);
private static final int TOTAL_ROWS_COUNT = ROWS.size();
@@ -58,9 +67,9 @@ public class JdbcQueryCursorSelfTest {
assertEquals(ROWS, results);
}
- private static List<Integer> fetchFullBatch(int maxRows, int fetchSize) {
+ private List<Integer> fetchFullBatch(int maxRows, int fetchSize) {
JdbcQueryCursor<Integer> cursor = new JdbcQueryCursor<>(maxRows,
- new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, null, null,
+ new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, null, txWrapper,
new AsyncWrapper<>(CompletableFuture.completedFuture(ROWS.iterator()), Runnable::run)));
List<Integer> results = new ArrayList<>(maxRows);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index 08106c9cc6..c8c73fa78b 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionInfo;
+import org.apache.ignite.tx.IgniteTransactions;
/**
* Fake {@link QueryProcessor}.
@@ -49,7 +50,7 @@ public class FakeIgniteQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
- SessionId sessionid, QueryContext context, String qry,
+ SessionId sessionid, QueryContext context, IgniteTransactions transactions, String qry,
Object... params) {
return CompletableFuture.completedFuture(new FakeCursor());
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 2413724fc4..9d3c4a57fd 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -207,7 +207,7 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
QueryContext context = QueryContext.create(SqlQueryType.ALL);
return getAllFromCursor(
- qryProc.querySingleAsync(sessionId, context, sql, args).join()
+ qryProc.querySingleAsync(sessionId, context, node(0).transactions(), sql, args).join()
);
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
index 88f177aa8b..3de017059b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
@@ -114,7 +114,7 @@ public class AbstractOneNodeBenchmark {
var context = QueryContext.create(SqlQueryType.ALL);
getAllFromCursor(
- await(queryEngine.querySingleAsync(sessionId, context, sql))
+ await(queryEngine.querySingleAsync(sessionId, context, clusterNode.transactions(), sql))
);
} finally {
queryEngine.closeSession(sessionId);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index fbaf31240f..5a661e2634 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -400,7 +400,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
indexManager,
schemaManager,
dataStorageManager,
- txManager,
distributionZoneManager,
() -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
replicaSvc,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 48615d41ba..10ee8fb135 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -459,13 +459,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void select() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
TestPageProcessor pageProc = new TestPageProcessor(4);
await(ses.executeAsync(null, "SELECT ID FROM TEST").thenCompose(pageProc));
@@ -481,11 +482,12 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void metadata() {
sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT NULL)");
- sql("INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().build();
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
+
AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, COL0 FROM TEST"));
// Validate columns metadata.
@@ -561,13 +563,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void pageSequence() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(1).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID"));
var p0 = ars0.currentPage();
AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage());
@@ -601,13 +604,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void errors() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
// Parse error.
checkError(SqlException.class, STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM");
@@ -685,13 +689,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void closeSession() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST"));
await(ses.closeAsync());
@@ -769,13 +774,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void resultSetCloseShouldFinishImplicitTransaction() throws InterruptedException {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
-
Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST");
AsyncResultSet<SqlRow> ars = f.join();
@@ -790,14 +796,16 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void resultSetFullReadShouldFinishImplicitTransaction() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
// Fetch all data in one read.
Session ses = sql.sessionBuilder().defaultPageSize(100).build();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST");
AsyncResultSet<SqlRow> ars = f.join();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index dc1bf88eb7..518c8faf1a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -249,13 +249,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void select() throws Exception {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST");
Set<Integer> set = Streams.stream(rs).map(r -> r.intValue(0)).collect(Collectors.toSet());
@@ -272,13 +273,13 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void errors() throws InterruptedException {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
// Parse error.
checkError(SqlException.class, STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM");
@@ -420,13 +421,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
@Test
public void resultSetCloseShouldFinishImplicitTransaction() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST");
assertEquals(1, txManager().pending());
rs.close();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 1f31358f2b..bf2fb5ac01 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -287,6 +287,11 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe
protected QueryProcessor getEngine() {
return ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
}
+
+ @Override
+ protected IgniteTransactions transactions() {
+ return CLUSTER_NODES.get(0).transactions();
+ }
};
}
@@ -446,7 +451,7 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe
var context = QueryContext.create(SqlQueryType.ALL, tx);
return getAllFromCursor(
- await(queryEngine.querySingleAsync(sessionId, context, sql, args))
+ await(queryEngine.querySingleAsync(sessionId, context, CLUSTER_NODES.get(0).transactions(), sql, args))
);
} finally {
queryEngine.closeSession(sessionId);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
index 1a795c0325..bceb796de4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.util.QueryChecker.QueryTemplate;
import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.provider.Arguments;
@@ -125,6 +126,11 @@ public abstract class BaseDataTypeTest<T extends Comparable<T>> extends ClusterP
return ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine();
}
+ @Override
+ protected IgniteTransactions transactions() {
+ return igniteTx();
+ }
+
@Override
protected void checkMetadata(AsyncSqlCursor<?> cursor) {
Optional<ColumnMetadata> testKey = cursor.metadata().columns()
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bbc51f28bd..9828499320 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -555,7 +555,6 @@ public class IgniteImpl implements Ignite {
indexManager,
schemaManager,
dataStorageMgr,
- txManager,
distributionZoneManager,
() -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
replicaSvc,
@@ -564,7 +563,7 @@ public class IgniteImpl implements Ignite {
metricManager
);
- sql = new IgniteSqlImpl(qryEngine);
+ sql = new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager));
var deploymentManagerImpl = new DeploymentManagerImpl(
clusterSvc,
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
index a62a433cf1..b78178768a 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionInfo;
+import org.apache.ignite.tx.IgniteTransactions;
/**
* {@link QueryProcessor} that handles test {@link NativeTypeWrapper native type wrappers} .
@@ -71,11 +72,15 @@ public final class TestQueryProcessor implements QueryProcessor {
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(SessionId sessionId, QueryContext context, String qry,
- Object... params) {
-
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
+ SessionId sessionId,
+ QueryContext context,
+ IgniteTransactions transactions,
+ String qry,
+ Object... params
+ ) {
Object[] unwrappedParams = Arrays.stream(params).map(NativeTypeWrapper::unwrap).toArray();
- return queryProcessor.querySingleAsync(sessionId, context, qry, unwrappedParams);
+ return queryProcessor.querySingleAsync(sessionId, context, transactions, qry, unwrappedParams);
}
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 6f164001b2..8f674bfe17 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.Session.SessionBuilder;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.IgniteTransactions;
/**
* Embedded implementation of the Ignite SQL query facade.
@@ -31,13 +32,17 @@ import org.apache.ignite.sql.Statement.StatementBuilder;
public class IgniteSqlImpl implements IgniteSql {
private final QueryProcessor qryProc;
+ private final IgniteTransactions transactions;
+
/**
* Constructor.
*
* @param qryProc Query processor.
+ * @param transactions Transactions facade.
*/
- public IgniteSqlImpl(QueryProcessor qryProc) {
+ public IgniteSqlImpl(QueryProcessor qryProc, IgniteTransactions transactions) {
this.qryProc = qryProc;
+ this.transactions = transactions;
}
/** {@inheritDoc} */
@@ -49,7 +54,7 @@ public class IgniteSqlImpl implements IgniteSql {
/** {@inheritDoc} */
@Override
public SessionBuilder sessionBuilder() {
- return new SessionBuilderImpl(qryProc, new HashMap<>());
+ return new SessionBuilderImpl(qryProc, transactions, new HashMap<>());
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
index 352db1c689..3ed4ca61b2 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionProperty;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.Session.SessionBuilder;
+import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.Nullable;
/**
@@ -40,6 +41,8 @@ public class SessionBuilderImpl implements SessionBuilder {
private final QueryProcessor qryProc;
+ private final IgniteTransactions transactions;
+
private long queryTimeout = DEFAULT_QUERY_TIMEOUT;
private long sessionTimeout = DEFAULT_SESSION_TIMEOUT;
@@ -54,10 +57,12 @@ public class SessionBuilderImpl implements SessionBuilder {
* Session builder constructor.
*
* @param qryProc SQL query processor.
+ * @param transactions Transactions facade.
* @param props Initial properties.
*/
- SessionBuilderImpl(QueryProcessor qryProc, Map<String, Object> props) {
+ SessionBuilderImpl(QueryProcessor qryProc, IgniteTransactions transactions, Map<String, Object> props) {
this.qryProc = qryProc;
+ this.transactions = transactions;
this.props = props;
}
@@ -145,6 +150,7 @@ public class SessionBuilderImpl implements SessionBuilder {
return new SessionImpl(
sessionId,
qryProc,
+ transactions,
pageSize,
propsHolder
);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 638ad0495b..fca3f6ae50 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -59,6 +59,7 @@ import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.sql.reactive.ReactiveResultSet;
import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -73,6 +74,8 @@ public class SessionImpl implements AbstractSession {
private final QueryProcessor qryProc;
+ private final IgniteTransactions transactions;
+
private final SessionId sessionId;
private final int pageSize;
@@ -83,16 +86,19 @@ public class SessionImpl implements AbstractSession {
* Constructor.
*
* @param qryProc Query processor.
+ * @param transactions Transactions facade.
* @param pageSize Query fetch page size.
* @param props Session's properties.
*/
SessionImpl(
SessionId sessionId,
QueryProcessor qryProc,
+ IgniteTransactions transactions,
int pageSize,
PropertiesHolder props
) {
this.qryProc = qryProc;
+ this.transactions = transactions;
this.sessionId = sessionId;
this.pageSize = pageSize;
this.props = props;
@@ -155,7 +161,7 @@ public class SessionImpl implements AbstractSession {
propertyMap.put(entry.getKey().name, entry.getValue());
}
- return new SessionBuilderImpl(qryProc, propertyMap)
+ return new SessionBuilderImpl(qryProc, transactions, propertyMap)
.defaultPageSize(pageSize);
}
@@ -174,7 +180,7 @@ public class SessionImpl implements AbstractSession {
try {
QueryContext ctx = QueryContext.create(SqlQueryType.ALL, transaction);
- result = qryProc.querySingleAsync(sessionId, ctx, query, arguments)
+ result = qryProc.querySingleAsync(sessionId, ctx, transactions, query, arguments)
.thenCompose(cur -> cur.requestNextAsync(pageSize)
.thenApply(
batchRes -> new AsyncResultSetImpl<>(
@@ -251,7 +257,7 @@ public class SessionImpl implements AbstractSession {
Object[] args = batch.get(i).toArray();
final var qryFut = tail
- .thenCompose(v -> qryProc.querySingleAsync(sessionId, ctx, query, args));
+ .thenCompose(v -> qryProc.querySingleAsync(sessionId, ctx, transactions, query, args));
tail = qryFut.thenCompose(cur -> cur.requestNextAsync(1))
.thenAccept(page -> {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 99251fa8cd..5d1baead5c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.sql.engine;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.sql.ResultSetMetadata;
-import org.jetbrains.annotations.Nullable;
/**
* Sql query cursor.
@@ -33,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
private final SqlQueryType queryType;
private final ResultSetMetadata meta;
- private final @Nullable InternalTransaction implicitTx;
+ private final QueryTransactionWrapper txWrapper;
private final AsyncCursor<T> dataCursor;
/**
@@ -46,12 +44,12 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
public AsyncSqlCursorImpl(
SqlQueryType queryType,
ResultSetMetadata meta,
- @Nullable InternalTransaction implicitTx,
+ QueryTransactionWrapper txWrapper,
AsyncCursor<T> dataCursor
) {
this.queryType = queryType;
this.meta = meta;
- this.implicitTx = implicitTx;
+ this.txWrapper = txWrapper;
this.dataCursor = dataCursor;
}
@@ -72,16 +70,14 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
if (t != null) {
- if (implicitTx != null) {
- implicitTx.rollback();
- }
+ txWrapper.rollbackImplicit();
throw new CompletionException(wrapIfNecessary(t));
}
- if (implicitTx != null && !batch.hasMore()) {
+ if (!batch.hasMore()) {
// last batch, need to commit transaction
- implicitTx.commit();
+ txWrapper.commitImplicit();
}
return batch;
@@ -92,9 +88,8 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
@Override
public CompletableFuture<Void> closeAsync() {
// Commit implicit transaction, if any.
- if (implicitTx != null) {
- implicitTx.commit();
- }
+ txWrapper.commitImplicit();
+
return dataCursor.closeAsync();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 92bacfd50d..b52ef56d34 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionInfo;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.tx.IgniteTransactions;
/**
* QueryProcessor interface.
@@ -61,8 +62,10 @@ public interface QueryProcessor extends IgniteComponent {
*
* <p>If the query string contains more than one statement the IgniteException will be thrown.
*
+ * @param sessionId A session identifier.
* @param context User query context.
- * @param qry Single statement SQL query .
+ * @param transactions Transactions facade.
+ * @param qry Single statement SQL query.
* @param params Query parameters.
* @return Sql cursor.
*
@@ -71,6 +74,7 @@ public interface QueryProcessor extends IgniteComponent {
CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
SessionId sessionId,
QueryContext context,
+ IgniteTransactions transactions,
String qry,
Object... params
);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
new file mode 100644
index 0000000000..e39be75076
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+/**
+ * Wrapper for the transaction that encapsulates the management of an implicit transaction.
+ */
+public class QueryTransactionWrapper {
+ private final boolean implicit;
+
+ private final InternalTransaction transaction;
+
+ QueryTransactionWrapper(InternalTransaction transaction, boolean implicit) {
+ this.transaction = transaction;
+ this.implicit = implicit;
+ }
+
+ /**
+ * Unwrap transaction.
+ */
+ InternalTransaction unwrap() {
+ return transaction;
+ }
+
+ /**
+ * Commits an implicit transaction, if one has been started.
+ */
+ void commitImplicit() {
+ if (implicit) {
+ transaction.commit();
+ }
+ }
+
+ /**
+ * Rolls back an implicit transaction, if one has been started.
+ */
+ void rollbackImplicit() {
+ if (implicit) {
+ transaction.rollback();
+ }
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 647cc4e934..33445d9aac 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@@ -69,6 +68,7 @@ import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
@@ -91,7 +91,6 @@ import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
@@ -99,11 +98,13 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.SchemaNotFoundException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
- * SqlQueryProcessor.
+ * SqlQueryProcessor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class SqlQueryProcessor implements QueryProcessor {
@@ -172,9 +173,6 @@ public class SqlQueryProcessor implements QueryProcessor {
private volatile SqlSchemaManager sqlSchemaManager;
- /** Transaction manager. */
- private final TxManager txManager;
-
/** Distribution zones manager. */
private final DistributionZoneManager distributionZoneManager;
@@ -198,7 +196,6 @@ public class SqlQueryProcessor implements QueryProcessor {
IndexManager indexManager,
SchemaManager schemaManager,
DataStorageManager dataStorageManager,
- TxManager txManager,
DistributionZoneManager distributionZoneManager,
Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier,
ReplicaService replicaService,
@@ -212,7 +209,6 @@ public class SqlQueryProcessor implements QueryProcessor {
this.indexManager = indexManager;
this.schemaManager = schemaManager;
this.dataStorageManager = dataStorageManager;
- this.txManager = txManager;
this.distributionZoneManager = distributionZoneManager;
this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
this.replicaService = replicaService;
@@ -367,14 +363,18 @@ public class SqlQueryProcessor implements QueryProcessor {
/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
- SessionId sessionId, QueryContext context, String qry, Object... params
+ SessionId sessionId,
+ QueryContext context,
+ IgniteTransactions transactions,
+ String qry,
+ Object... params
) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
try {
- return querySingle0(sessionId, context, qry, params);
+ return querySingle0(sessionId, context, transactions, qry, params);
} finally {
busyLock.leaveBusy();
}
@@ -401,6 +401,7 @@ public class SqlQueryProcessor implements QueryProcessor {
private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
SessionId sessionId,
QueryContext context,
+ IgniteTransactions transactions,
String sql,
Object... params
) {
@@ -429,93 +430,120 @@ public class SqlQueryProcessor implements QueryProcessor {
return CompletableFuture.failedFuture(new SessionNotFoundException(sessionId));
}
- CompletableFuture<Void> start = new CompletableFuture<>();
+ CompletableFuture<AsyncSqlCursor<List<Object>>> start = new CompletableFuture<>();
- AtomicReference<InternalTransaction> tx = new AtomicReference<>();
+ CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start.thenCompose(ignored -> {
+ ParsedResult result = parserService.parse(sql);
- CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
- .thenCompose(ignored -> {
- ParsedResult result = parserService.parse(sql);
+ validateParsedStatement(context, result, params);
- validateParsedStatement(context, outerTx, result, params);
+ QueryTransactionWrapper txWrapper = wrapTxOrStartImplicit(result.queryType(), transactions, outerTx);
- boolean rwOp = dataModificationOp(result);
+ return waitForActualSchema(schemaName, txWrapper.unwrap().startTimestamp())
+ .thenCompose(schema -> {
+ BaseQueryContext ctx = BaseQueryContext.builder()
+ .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+ .logger(LOG)
+ .cancel(queryCancel)
+ .parameters(params).build();
- boolean implicitTxRequired = outerTx == null;
+ return prepareSvc.prepareAsync(result, ctx).thenApply(plan -> executePlan(session, txWrapper, ctx, plan));
+ }).whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollbackImplicit();
+ }
+ });
+ });
- InternalTransaction currentTx = implicitTxRequired ? txManager.begin(!rwOp, null) : outerTx;
+ // TODO IGNITE-20078 Improve (or remove) CancellationException handling.
+ stage.whenComplete((cur, ex) -> {
+ if (ex instanceof CancellationException) {
+ queryCancel.cancel();
+ }
+ });
- tx.set(currentTx);
+ start.completeAsync(() -> null, taskExecutor);
- // TODO IGNITE-18733: wait for actual metadata for TX.
- HybridTimestamp txTimestamp = currentTx.startTimestamp();
+ return stage;
+ }
- SchemaPlus schema = sqlSchemaManager.schema(schemaName, txTimestamp.longValue());
+ private CompletableFuture<SchemaPlus> waitForActualSchema(String schemaName, HybridTimestamp timestamp) {
+ try {
+ // TODO IGNITE-18733: wait for actual metadata for TX.
+ SchemaPlus schema = sqlSchemaManager.schema(schemaName, timestamp.longValue());
- if (schema == null) {
- return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName));
- }
+ if (schema == null) {
+ return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName));
+ }
- BaseQueryContext ctx = BaseQueryContext.builder()
- .frameworkConfig(
- Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .build()
- )
- .logger(LOG)
- .cancel(queryCancel)
- .parameters(params)
- .build();
-
- return prepareSvc.prepareAsync(result, ctx)
- .thenApply(plan -> {
- var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx);
-
- SqlQueryType queryType = plan.type();
- assert queryType != null : "Expected a full plan but got a fragment: " + plan;
-
- numberOfOpenCursors.incrementAndGet();
-
- return new AsyncSqlCursorImpl<>(
- queryType,
- plan.metadata(),
- implicitTxRequired ? tx.get() : null,
- new AsyncCursor<List<Object>>() {
- @Override
- public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
- session.touch();
-
- return dataCursor.requestNextAsync(rows);
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- session.touch();
- numberOfOpenCursors.decrementAndGet();
-
- return dataCursor.closeAsync();
- }
- }
- );
- });
- });
+ return CompletableFuture.completedFuture(schema);
+ } catch (Throwable t) {
+ return CompletableFuture.failedFuture(t);
+ }
+ }
- stage.whenComplete((cur, ex) -> {
- if (ex instanceof CancellationException) {
- queryCancel.cancel();
- }
+ private AsyncSqlCursor<List<Object>> executePlan(
+ Session session,
+ QueryTransactionWrapper txWrapper,
+ BaseQueryContext ctx,
+ QueryPlan plan
+ ) {
+ var dataCursor = executionSrvc.executePlan(txWrapper.unwrap(), plan, ctx);
+
+ SqlQueryType queryType = plan.type();
+ assert queryType != null : "Expected a full plan but got a fragment: " + plan;
- if (ex != null && outerTx == null) {
- InternalTransaction tx0 = tx.get();
- if (tx0 != null) {
- tx0.rollback();
+ numberOfOpenCursors.incrementAndGet();
+
+ return new AsyncSqlCursorImpl<>(
+ queryType,
+ plan.metadata(),
+ txWrapper,
+ new AsyncCursor<List<Object>>() {
+ @Override
+ public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+ session.touch();
+
+ return dataCursor.requestNextAsync(rows);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ session.touch();
+ numberOfOpenCursors.decrementAndGet();
+
+ return dataCursor.closeAsync();
+ }
}
- }
- });
+ );
+ }
- start.completeAsync(() -> null, taskExecutor);
+ /**
+ * Creates a new transaction wrapper using an existing outer transaction or starting a new "implicit" transaction.
+ *
+ * @param queryType Query type.
+ * @param transactions Transactions facade.
+ * @param outerTx Outer transaction.
+ * @return Wrapper for an active transaction.
+ * @throws SqlException If an outer transaction was started for a {@link SqlQueryType#DDL DDL} query.
+ */
+ static QueryTransactionWrapper wrapTxOrStartImplicit(
+ SqlQueryType queryType,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction outerTx
+ ) {
+ if (outerTx == null) {
+ InternalTransaction tx = (InternalTransaction) transactions.begin(
+ new TransactionOptions().readOnly(queryType != SqlQueryType.DML));
- return stage;
+ return new QueryTransactionWrapper(tx, true);
+ }
+
+ if (SqlQueryType.DDL == queryType) {
+ throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions.");
+ }
+
+ return new QueryTransactionWrapper(outerTx, false);
}
@TestOnly
@@ -548,11 +576,11 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) {
return schemaHolder.onTableCreated(
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
- DEFAULT_SCHEMA_NAME,
- parameters.tableId(),
- parameters.causalityToken()
- )
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+ DEFAULT_SCHEMA_NAME,
+ parameters.tableId(),
+ parameters.causalityToken()
+ )
.thenApply(v -> false);
}
}
@@ -566,11 +594,11 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) {
return schemaHolder.onTableUpdated(
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
- DEFAULT_SCHEMA_NAME,
- parameters.tableId(),
- parameters.causalityToken()
- )
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+ DEFAULT_SCHEMA_NAME,
+ parameters.tableId(),
+ parameters.causalityToken()
+ )
.thenApply(v -> false);
}
}
@@ -584,11 +612,11 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) {
return schemaHolder.onTableDropped(
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
- DEFAULT_SCHEMA_NAME,
- parameters.tableId(),
- parameters.causalityToken()
- )
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+ DEFAULT_SCHEMA_NAME,
+ parameters.tableId(),
+ parameters.causalityToken()
+ )
.thenApply(v -> false);
}
}
@@ -602,12 +630,12 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<Boolean> notify(IndexEventParameters parameters, @Nullable Throwable exception) {
return schemaHolder.onIndexDropped(
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
- DEFAULT_SCHEMA_NAME,
- parameters.tableId(),
- parameters.indexId(),
- parameters.causalityToken()
- )
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
+ DEFAULT_SCHEMA_NAME,
+ parameters.tableId(),
+ parameters.indexId(),
+ parameters.causalityToken()
+ )
.thenApply(v -> false);
}
}
@@ -621,24 +649,18 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<Boolean> notify(IndexEventParameters parameters, @Nullable Throwable exception) {
return schemaHolder.onIndexCreated(
- parameters.tableId(),
- parameters.indexId(),
- parameters.indexDescriptor(),
- parameters.causalityToken()
- )
+ parameters.tableId(),
+ parameters.indexId(),
+ parameters.indexDescriptor(),
+ parameters.causalityToken()
+ )
.thenApply(v -> false);
}
}
- /** Returns {@code true} if this is data modification operation. */
- private static boolean dataModificationOp(ParsedResult parsedResult) {
- return parsedResult.queryType() == SqlQueryType.DML;
- }
-
/** Performs additional validation of a parsed statement. **/
private static void validateParsedStatement(
QueryContext context,
- @Nullable InternalTransaction outerTx,
ParsedResult parsedResult,
Object[] params
) {
@@ -651,10 +673,6 @@ public class SqlQueryProcessor implements QueryProcessor {
throw new QueryValidationException(message);
}
- if (SqlQueryType.DDL == queryType && outerTx != null) {
- throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions.");
- }
-
if (parsedResult.dynamicParamsCount() != params.length) {
String message = format(
"Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).",
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
index dec2b80a98..eb30c61bcb 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
@@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Collections;
import java.util.List;
@@ -48,70 +47,66 @@ public class AsyncSqlCursorImplTest {
private static final ResultSetMetadata RESULT_SET_METADATA = new ResultSetMetadataImpl(Collections.emptyList());
/** Cursor should trigger commit of implicit transaction (if any) only if data is fully read. */
- @ParameterizedTest
+ @ParameterizedTest(name = "{0}")
@MethodSource("transactions")
- public void testTriggerCommitAfterDataIsFullyRead(NoOpTransaction implicitTx) {
+ public void testTriggerCommitAfterDataIsFullyRead(boolean implicit, QueryTransactionWrapper txWrapper) {
List<Integer> list = List.of(1, 2, 3);
- AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+ AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper,
new AsyncWrapper<>(CompletableFuture.completedFuture(list.iterator()), Runnable::run));
int requestRows = 2;
BatchedResult<Integer> in1 = cursor.requestNextAsync(requestRows).join();
assertEquals(in1.items(), list.subList(0, requestRows));
- if (implicitTx != null) {
- CompletableFuture<Void> f = implicitTx.commitFuture();
- assertFalse(f.isDone(), "Implicit transaction should have not been committed because there is more data.");
- }
+ assertFalse(((NoOpTransaction) txWrapper.unwrap()).commitFuture().isDone(),
+ "Implicit transaction should have not been committed because there is more data.");
BatchedResult<Integer> in2 = cursor.requestNextAsync(requestRows).join();
assertEquals(in2.items(), list.subList(requestRows, list.size()));
- if (implicitTx != null) {
- CompletableFuture<Void> f = implicitTx.commitFuture();
- assertTrue(f.isDone(), "Implicit transaction should been committed because there is no more data");
- }
+ CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).commitFuture();
+ assertEquals(implicit, f.isDone(), "Implicit transaction should been committed because there is no more data");
}
/** Exception on read should trigger rollback of implicit transaction, if any. */
- @ParameterizedTest
+ @ParameterizedTest(name = "{0}")
@MethodSource("transactions")
- public void testExceptionRollbacksImplicitTx(NoOpTransaction implicitTx) {
+ public void testExceptionRollbacksImplicitTx(boolean implicit, QueryTransactionWrapper txWrapper) {
IgniteException err = new IgniteException(Common.INTERNAL_ERR);
- AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+ AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper,
new AsyncWrapper<>(CompletableFuture.failedFuture(err), Runnable::run));
CompletionException t = assertThrows(CompletionException.class, () -> cursor.requestNextAsync(1).join());
- if (implicitTx != null) {
- CompletableFuture<Void> f = implicitTx.rollbackFuture();
- assertTrue(f.isDone(), "Implicit transaction should have been rolled back: " + f);
- }
+ CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).rollbackFuture();
+ assertEquals(implicit, f.isDone(), "Implicit transaction should have been rolled back: " + f);
IgniteException igniteErr = assertInstanceOf(IgniteException.class, t.getCause());
assertEquals(err.codeAsString(), igniteErr.codeAsString());
}
/** Cursor close should trigger commit of implicit transaction, if any. */
- @ParameterizedTest
+ @ParameterizedTest(name = "{0}")
@MethodSource("transactions")
- public void testCloseCommitsImplicitTx(NoOpTransaction implicitTx) {
+ public void testCloseCommitsImplicitTx(boolean implicit, QueryTransactionWrapper txWrapper) {
AsyncCursor<Integer> data = new AsyncWrapper<>(List.of(1, 2, 3, 4).iterator());
- AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, data);
+ AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper, data);
cursor.closeAsync().join();
- if (implicitTx != null) {
- CompletableFuture<Void> f = implicitTx.commitFuture();
- assertTrue(f.isDone(), "Implicit transaction should have been committed: " + f);
- }
+ CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).commitFuture();
+ assertEquals(implicit, f.isDone(), "Implicit transaction should have been committed: " + f);
}
private static Stream<Arguments> transactions() {
return Stream.of(
- Arguments.of(Named.named("implicit-tx", NoOpTransaction.readOnly("TX"))),
- Arguments.of(Named.named("no implicit-tx", null))
+ Arguments.of(Named.named("implicit-tx", true), newTxWrapper(true)),
+ Arguments.of(Named.named("explicit-tx", false), newTxWrapper(false))
);
}
+
+ private static QueryTransactionWrapper newTxWrapper(boolean implicit) {
+ return new QueryTransactionWrapper(NoOpTransaction.readOnly("TX"), implicit);
+ }
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
new file mode 100644
index 0000000000..f3c9d0e659
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.SqlQueryProcessor.wrapTxOrStartImplicit;
+import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.EnumSet;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.ErrorGroups;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for class {@link QueryTransactionWrapper}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class QueryTransactionWrapperSelfTest extends BaseIgniteAbstractTest {
+ @Mock
+ private IgniteTransactions transactions;
+
+ @Test
+ public void throwsExceptionForDdlWithExternalTransaction() {
+ //noinspection ThrowableNotThrown
+ assertThrowsSqlException(ErrorGroups.Sql.STMT_VALIDATION_ERR,
+ () -> wrapTxOrStartImplicit(SqlQueryType.DDL, transactions, new NoOpTransaction("test")));
+ verifyNoInteractions(transactions);
+ }
+
+ @Test
+ public void testImplicitTransactionAttributes() {
+ when(transactions.begin(any())).thenAnswer(
+ inv -> {
+ boolean readOnly = inv.getArgument(0, TransactionOptions.class).readOnly();
+
+ return readOnly ? NoOpTransaction.readOnly("test-ro") : NoOpTransaction.readWrite("test-rw");
+ }
+ );
+
+ assertThat(wrapTxOrStartImplicit(SqlQueryType.DML, transactions, null).unwrap().isReadOnly(), equalTo(false));
+
+ for (SqlQueryType type : EnumSet.complementOf(EnumSet.of(SqlQueryType.DML))) {
+ assertThat(wrapTxOrStartImplicit(type, transactions, null).unwrap().isReadOnly(), equalTo(true));
+ }
+
+ verify(transactions, times(SqlQueryType.values().length)).begin(any());
+ verifyNoMoreInteractions(transactions);
+ }
+
+ @Test
+ public void commitAndRollbackNotAffectExternalTransaction() {
+ NoOpTransaction externalTx = new NoOpTransaction("test");
+
+ QueryTransactionWrapper wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, externalTx);
+ wrapper.commitImplicit();
+ assertFalse(externalTx.commitFuture().isDone());
+
+ wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, externalTx);
+ wrapper.rollbackImplicit();
+ assertFalse(externalTx.commitFuture().isDone());
+
+ verifyNoInteractions(transactions);
+ }
+
+ @Test
+ public void testCommitImplicit() {
+ QueryTransactionWrapper wrapper = prepareImplicitTx();
+ NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap();
+
+ wrapper.commitImplicit();
+
+ assertThat(tx.commitFuture().isDone(), equalTo(true));
+ assertThat(tx.rollbackFuture().isDone(), equalTo(false));
+ }
+
+ @Test
+ public void testRollbackImplicit() {
+ QueryTransactionWrapper wrapper = prepareImplicitTx();
+ NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap();
+
+ wrapper.rollbackImplicit();
+
+ assertThat(tx.rollbackFuture().isDone(), equalTo(true));
+ assertThat(tx.commitFuture().isDone(), equalTo(false));
+ }
+
+ private QueryTransactionWrapper prepareImplicitTx() {
+ when(transactions.begin(any())).thenReturn(new NoOpTransaction("test"));
+
+ QueryTransactionWrapper wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, null);
+
+ assertThat(wrapper.unwrap(), instanceOf(NoOpTransaction.class));
+ NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap();
+
+ assertFalse(tx.rollbackFuture().isDone());
+ assertFalse(tx.commitFuture().isDone());
+
+ return wrapper;
+ }
+}
+
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 1fbf38976d..98aab74fb0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -76,7 +75,6 @@ import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteException;
@@ -87,6 +85,7 @@ import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -122,7 +121,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
private MessagingService msgSrvc;
@Mock
- private TxManager txManager;
+ private IgniteTransactions transactions;
@Mock
private DistributionZoneManager distributionZoneManager;
@@ -235,7 +234,6 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
indexManager,
schemaManager,
dataStorageManager,
- txManager,
distributionZoneManager,
Map::of,
mock(ReplicaService.class),
@@ -256,7 +254,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
when(tbl.storage()).thenReturn(mock(MvTableStorage.class));
when(tbl.storage().getTableDescriptor()).thenReturn(new StorageTableDescriptor(tblId, 1, "none"));
- when(txManager.begin(anyBoolean(), any())).thenReturn(new NoOpTransaction(localNode.name()));
+ when(transactions.begin(any())).thenReturn(new NoOpTransaction(localNode.name()));
qryProc.start();
@@ -268,6 +266,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
var cursors = qryProc.querySingleAsync(
sessionId,
context,
+ transactions,
"SELECT * FROM TEST"
);
@@ -293,6 +292,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest {
assertTrue(assertThrows(IgniteInternalException.class, () -> qryProc.querySingleAsync(
sessionId,
context,
+ transactions,
"SELECT 1"
)).getCause() instanceof NodeStoppingException);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index d698e996d7..e675871303 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -105,13 +105,13 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -158,6 +158,10 @@ public class MockedStructuresTest extends IgniteAbstractTest {
@Mock(lenient = true)
private TxManager tm;
+ /** Ignite transactions. */
+ @Mock(lenient = true)
+ private IgniteTransactions transactions;
+
/** Meta storage manager. */
@Mock
MetaStorageManager msm;
@@ -204,13 +208,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
SqlQueryProcessor queryProc;
- /** Test node. */
- private final ClusterNode node = new ClusterNodeImpl(
- UUID.randomUUID().toString(),
- NODE_NAME,
- new NetworkAddress("127.0.0.1", 2245)
- );
-
@InjectConfiguration
private RocksDbStorageEngineConfiguration rocksDbEngineConfig;
@@ -326,7 +323,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
idxManager,
schemaManager,
dataStorageManager,
- tm,
distributionZoneManager,
() -> dataStorageModules.collectSchemasFields(
List.of(
@@ -374,17 +370,12 @@ public class MockedStructuresTest extends IgniteAbstractTest {
*/
@Test
public void testCreateTable() {
- SqlQueryProcessor finalQueryProc = queryProc;
-
- SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
- QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ "with primary_zone='%s'", curMethodName, ZONE_NAME);
- readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
+ readFirst(sql(newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase(curMethodName)));
@@ -392,23 +383,23 @@ public class MockedStructuresTest extends IgniteAbstractTest {
String finalNewTblSql1 = newTblSql;
assertThrows(TableAlreadyExistsException.class,
- () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql1)));
+ () -> readFirst(sql(finalNewTblSql1)));
String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ "with primary_zone='%s'", curMethodName, ZONE_NAME);
assertThrows(TableAlreadyExistsException.class,
- () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql2)));
+ () -> readFirst(sql(finalNewTblSql2)));
- assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
+ assertThrows(SqlException.class, () -> readFirst(sql(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions__wrong=1,primary_zone='"
+ ZONE_NAME + "'")));
- assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
+ assertThrows(SqlException.class, () -> readFirst(sql(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas__wrong=1,primary_zone='"
+ ZONE_NAME + "'")));
- assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
+ assertThrows(SqlException.class, () -> readFirst(sql(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone__wrong='"
+ ZONE_NAME + "'")));
@@ -417,7 +408,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
String finalNewTblSql3 = newTblSql;
- assertDoesNotThrow(() -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql3)));
+ assertDoesNotThrow(() -> readFirst(sql(finalNewTblSql3)));
}
/**
@@ -427,28 +418,25 @@ public class MockedStructuresTest extends IgniteAbstractTest {
public void testCreateTableWithDistributionZone() {
String tableName = getCurrentMethodName().toUpperCase();
- SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
- QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) ",
tableName);
- readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
+ readFirst(sql(newTblSql));
assertEquals(getZoneId(DEFAULT_ZONE_NAME), tblsCfg.tables().get(tableName).zoneId().value());
- readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName));
+ readFirst(sql("DROP TABLE " + tableName));
int zoneId = dstZnsCfg.distributionZones().get(ZONE_NAME).zoneId().value();
newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ "with primary_zone='%s'", tableName, ZONE_NAME);
- readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
+ readFirst(sql(newTblSql));
assertEquals(zoneId, tblsCfg.tables().get(tableName).zoneId().value());
- readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName));
+ readFirst(sql("DROP TABLE " + tableName));
log.info("Creating a table with a non-existent distribution zone.");
@@ -458,8 +446,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
Throwable exception = assertThrows(
Throwable.class,
- () -> readFirst(queryProc.querySingleAsync(sessionId, context,
- String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ () -> readFirst(sql(String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) "
+ "with primary_zone='%s'", tableName, nonExistZone)))
);
@@ -473,29 +460,21 @@ public class MockedStructuresTest extends IgniteAbstractTest {
public void testDropTable() {
String curMethodName = getCurrentMethodName();
- SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
- QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName);
- readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
-
- readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + curMethodName));
+ readFirst(sql(newTblSql));
- SqlQueryProcessor finalQueryProc = queryProc;
+ readFirst(sql("DROP TABLE " + curMethodName));
- assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
- "DROP TABLE " + curMethodName + "_not_exist")));
+ assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE " + curMethodName + "_not_exist")));
- assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
- "DROP TABLE " + curMethodName)));
+ assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE " + curMethodName)));
- assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context,
- "DROP TABLE PUBLIC." + curMethodName)));
+ assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE PUBLIC." + curMethodName)));
- readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
+ readFirst(sql("DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
- readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName));
+ readFirst(sql("DROP TABLE IF EXISTS PUBLIC." + curMethodName));
assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -505,12 +484,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
void createTableWithTableOptions() {
String method = getCurrentMethodName();
- SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
- QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
- assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync(
- sessionId,
- context,
+ assertDoesNotThrow(() -> readFirst(sql(
String.format(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone='%s'",
method + 4,
@@ -520,9 +494,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
IgniteException exception = assertThrows(
IgniteException.class,
- () -> readFirst(queryProc.querySingleAsync(
- sessionId,
- context,
+ () -> readFirst(sql(
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) WITH %s='%s'", method + 6, method, method)
))
);
@@ -584,6 +556,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
InternalTransaction tx = mock(InternalTransaction.class);
when(tx.startTimestamp()).thenReturn(HybridTimestamp.MAX_VALUE);
when(tm.begin(anyBoolean(), any())).thenReturn(tx);
+ when(transactions.begin(any())).thenReturn(tx);
when(replicaManager.stopReplica(any())).thenReturn(completedFuture(true));
@@ -630,4 +603,11 @@ public class MockedStructuresTest extends IgniteAbstractTest {
private int getZoneId(String zoneName) {
return getZoneIdStrict(dstZnsCfg, zoneName);
}
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>> sql(String query) {
+ SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+ return queryProc.querySingleAsync(sessionId, context, transactions, query);
+ }
}
diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
index d8974861d8..a676e3cdc2 100644
--- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
+++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
@@ -474,7 +475,7 @@ public abstract class QueryChecker {
if (!CollectionUtils.nullOrEmpty(planMatchers) || exactPlan != null) {
CompletableFuture<AsyncSqlCursor<List<Object>>> explainCursors = qryProc.querySingleAsync(sessionId,
- context, "EXPLAIN PLAN FOR " + qry, params);
+ context, transactions(), "EXPLAIN PLAN FOR " + qry, params);
AsyncSqlCursor<List<Object>> explainCursor = await(explainCursors);
List<List<Object>> explainRes = getAllFromCursor(explainCursor);
@@ -491,7 +492,9 @@ public abstract class QueryChecker {
}
}
// Check result.
- CompletableFuture<AsyncSqlCursor<List<Object>>> cursors = qryProc.querySingleAsync(sessionId, context, qry, params);
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursors =
+ qryProc.querySingleAsync(sessionId, context, transactions(), qry, params);
+
AsyncSqlCursor<List<Object>> cur = await(cursors);
checkMetadata(cur);
@@ -553,6 +556,8 @@ public abstract class QueryChecker {
protected abstract QueryProcessor getEngine();
+ protected abstract IgniteTransactions transactions();
+
protected void checkMetadata(AsyncSqlCursor<?> cursor) {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 4005572fcc..056ec46767 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -76,7 +76,7 @@ abstract class AbstractTableView {
return (IgniteException) th;
}
- //TODO: IGNITE-14500 Replace with public exception with an error code (or unwrap?).
+ //TODO: IGNITE-20181 KV/Binary view public API should only throw public exceptions for the end user.
return new IgniteException(th);
}
}