You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/10/27 17:19:50 UTC
[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17330 Support read only transactions from sql side - Fixes #1232.
This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
new 7b8b271d49 IGNITE-17330 Support read only transactions from sql side - Fixes #1232.
7b8b271d49 is described below
commit 7b8b271d49e76d37ea29de0d20120728fa2040d0
Author: zstan <st...@gmail.com>
AuthorDate: Thu Oct 27 20:12:56 2022 +0300
IGNITE-17330 Support read only transactions from sql side - Fixes #1232.
Signed-off-by: zstan <st...@gmail.com>
(cherry picked from commit d1405efac259beee360f04cdfed06d629d7780d4)
---
.../app/jdbc/ItJdbcInsertStatementSelfTest.java | 2 +-
.../runner/app/jdbc/ItJdbcStatementSelfTest.java | 2 +-
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 57 ++++++++++++++++++++++
.../internal/sql/api/AsyncResultSetImpl.java | 6 ---
.../internal/sql/engine/SqlQueryProcessor.java | 31 +++++++++---
.../sql/engine/exec/ExchangeServiceImpl.java | 9 ++--
.../internal/sql/engine/exec/ExecutionContext.java | 19 +++++---
.../sql/engine/exec/ExecutionServiceImpl.java | 27 +++++-----
.../sql/engine/exec/LogicalRelImplementor.java | 8 +--
.../sql/engine/exec/rel/TableScanNode.java | 8 ++-
.../sql/engine/message/QueryStartRequest.java | 8 +++
.../sql/engine/metadata/FragmentDescription.java | 2 +-
.../sql/engine/prepare/PrepareServiceImpl.java | 5 +-
.../internal/sql/engine/trait/TraitUtils.java | 16 ++++++
.../internal/sql/engine/util/BaseQueryContext.java | 21 +++++++-
.../internal/sql/engine/StopCalciteModuleTest.java | 2 +
.../sql/engine/exec/ExecutionServiceImplTest.java | 6 ++-
.../sql/engine/exec/RuntimeSortedIndexTest.java | 4 +-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 9 +++-
19 files changed, 191 insertions(+), 51 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java
index dbb332b917..db195269f2 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcInsertStatementSelfTest.java
@@ -206,7 +206,7 @@ public class ItJdbcInsertStatementSelfTest extends ItJdbcAbstractStatementSelfTe
assertTrue(resultSet.next());
- assertEquals(3, resultSet.getInt(1));
+ assertEquals(1, resultSet.getInt(1));
doCheck();
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java
index fb70a97aeb..1c75419cb5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/jdbc/ItJdbcStatementSelfTest.java
@@ -119,7 +119,7 @@ public class ItJdbcStatementSelfTest extends ItJdbcAbstractStatementSelfTest {
int val = rs.getInt(1);
- assertTrue(val >= 1 && val <= 10, "Invalid val: " + val);
+ assertEquals(5, val);
}
stmt.close();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index e6e5de129a..6b651caef0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -76,6 +76,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -321,6 +322,16 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
rs.closeAsync();
+ outerTx = igniteTx().begin();
+
+ rs = ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0").get();
+
+ assertEquals(2 * ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+
+ rs.closeAsync();
+
+ outerTx.commit();
+
checkDml(2 * ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
checkDml(2 * ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
@@ -332,6 +343,51 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
assertEquals(txManagerInternal.finished(), states.size());
}
+ /** Check correctness of rw and ro transactions. */
+ @Test
+ public void checkMixedTransactions() throws Exception {
+ IgniteSql sql = igniteSql();
+
+ if (sql instanceof ClientSql) {
+ return;
+ }
+
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ Session ses = sql.createSession();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ checkTx(ses, true, false, true);
+ checkTx(ses, true, false, false);
+ checkTx(ses, true, true, true);
+ checkTx(ses, true, true, false);
+ checkTx(ses, false, true, true);
+ checkTx(ses, false, true, false);
+ checkTx(ses, false, false, true);
+ checkTx(ses, false, false, false);
+ }
+
+ private void checkTx(Session ses, boolean readOnly, boolean commit, boolean explicit) throws Exception {
+ Transaction outerTx = explicit ? (readOnly ? igniteTx().readOnly().begin() : igniteTx().begin()) : null;
+
+ AsyncResultSet rs = ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0").get();
+
+ assertEquals(ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+
+ rs.closeAsync();
+
+ if (outerTx != null) {
+ if (commit) {
+ outerTx.commit();
+ } else {
+ outerTx.rollback();
+ }
+ }
+ }
+
@Test
public void select() throws ExecutionException, InterruptedException {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
@@ -521,6 +577,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
}
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17998")
@Test
public void closeSession() throws ExecutionException, InterruptedException {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 39b64a5694..a1d46be888 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -67,12 +67,6 @@ public class AsyncResultSetImpl implements AsyncResultSet {
this.curPage = page;
this.pageSize = pageSize;
this.closeRun = closeRun;
-
- assert cur.queryType() == SqlQueryType.QUERY
- || ((cur.queryType() == SqlQueryType.DML || cur.queryType() == SqlQueryType.DDL)
- && curPage.items().size() == 1
- && curPage.items().get(0).size() == 1
- && !curPage.hasMore()) : "Invalid query result: [type=" + cur.queryType() + "res=" + curPage + ']';
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 98cf2e9af4..3f86aae8af 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -38,10 +38,13 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.index.event.IndexEvent;
import org.apache.ignite.internal.index.event.IndexEventParameters;
@@ -61,7 +64,6 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
@@ -185,7 +187,7 @@ public class SqlQueryProcessor implements QueryProcessor {
));
var exchangeService = registerService(new ExchangeServiceImpl(
- nodeName,
+ clusterSrvc.topologyService().localMember(),
taskExecutor,
mailboxRegistry,
msgSrvc
@@ -387,17 +389,22 @@ public class SqlQueryProcessor implements QueryProcessor {
return nodes.get(0);
})
.thenCompose(sqlNode -> {
+ final boolean rwOp = dataModificationOp(sqlNode);
+ final HybridTimestamp txTime = outerTx != null ? outerTx.readTimestamp() : rwOp ? null : new HybridClock().now();
+
BaseQueryContext ctx = BaseQueryContext.builder()
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(schema)
- .traitDefs(Commons.LOCAL_TRAITS_SET)
+ .traitDefs(rwOp || (outerTx != null && !outerTx.isReadOnly()) ? Commons.LOCAL_TRAITS_SET :
+ Commons.DISTRIBUTED_TRAITS_SET)
.build()
)
.logger(LOG)
.cancel(queryCancel)
.parameters(params)
.transaction(outerTx)
+ .transactionTime(txTime)
.plannerTimeout(PLANNER_TIMEOUT)
.build();
@@ -406,9 +413,8 @@ public class SqlQueryProcessor implements QueryProcessor {
context.maybeUnwrap(QueryValidator.class)
.ifPresent(queryValidator -> queryValidator.validatePlan(plan));
- // Transactional DDL is not supported as well as RO transactions, hence
- // only DML requiring RW transaction is covered
- boolean implicitTxRequired = (plan.type() == Type.DML || plan.type() == Type.QUERY) && outerTx == null;
+ boolean implicitTxRequired = outerTx == null && rwOp;
+
InternalTransaction implicitTx = implicitTxRequired ? txManager.begin() : null;
BaseQueryContext enrichedContext =
@@ -469,16 +475,22 @@ public class SqlQueryProcessor implements QueryProcessor {
CompletableFuture<Void> start = new CompletableFuture<>();
for (SqlNode sqlNode : nodes) {
+ boolean needStartTx = SqlKind.DML.contains(sqlNode.getKind()) || SqlKind.QUERY.contains(sqlNode.getKind());
+ // Only rw transactions for now.
+ InternalTransaction implicitTx = needStartTx ? txManager.begin() : null;
+
final BaseQueryContext ctx = BaseQueryContext.builder()
.cancel(new QueryCancel())
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .traitDefs(needStartTx ? Commons.LOCAL_TRAITS_SET : Commons.DISTRIBUTED_TRAITS_SET)
.defaultSchema(schema)
.build()
)
.logger(LOG)
.parameters(params)
.plannerTimeout(PLANNER_TIMEOUT)
+ .transaction(implicitTx)
.build();
// TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix query execution flow.
@@ -490,7 +502,7 @@ public class SqlQueryProcessor implements QueryProcessor {
return new AsyncSqlCursorImpl<>(
SqlQueryType.mapPlanTypeToSqlType(plan.type()),
plan.metadata(),
- null,
+ implicitTx,
executionSrvc.executePlan(plan, ctx)
);
});
@@ -620,4 +632,9 @@ public class SqlQueryProcessor implements QueryProcessor {
.thenApply(v -> false);
}
}
+
+ /** Returns {@code true} if this is data modification operation. */
+ private static boolean dataModificationOp(SqlNode sqlNode) {
+ return SqlKind.DML.contains(sqlNode.getKind());
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 1ec7f53dc0..73d94a4ef0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
/**
* ExchangeServiceImpl. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -49,7 +50,7 @@ public class ExchangeServiceImpl implements ExchangeService {
private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
- private final String localNodeId;
+ private final ClusterNode localNode;
private final QueryTaskExecutor taskExecutor;
@@ -61,12 +62,12 @@ public class ExchangeServiceImpl implements ExchangeService {
* Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public ExchangeServiceImpl(
- String localNodeId,
+ ClusterNode localNode,
QueryTaskExecutor taskExecutor,
MailboxRegistry mailboxRegistry,
MessageService msgSrvc
) {
- this.localNodeId = localNodeId;
+ this.localNode = localNode;
this.taskExecutor = taskExecutor;
this.mailboxRegistry = mailboxRegistry;
this.msgSrvc = msgSrvc;
@@ -221,7 +222,7 @@ public class ExchangeServiceImpl implements ExchangeService {
.build(),
taskExecutor,
qryId,
- localNodeId,
+ localNode,
nodeId,
new FragmentDescription(
fragmentId,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 14f802ef1a..a544a393c4 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -32,6 +32,7 @@ import java.util.function.Consumer;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -74,7 +76,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
private final Map<String, Object> params;
- private final String locNodeId;
+ private final ClusterNode localNode;
private final String originatingNodeId;
@@ -111,7 +113,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
BaseQueryContext qctx,
QueryTaskExecutor executor,
UUID qryId,
- String locNodeId,
+ ClusterNode localNode,
String originatingNodeId,
FragmentDescription fragmentDesc,
RowHandler<RowT> handler,
@@ -126,7 +128,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
this.fragmentDesc = fragmentDesc;
this.handler = handler;
this.params = params;
- this.locNodeId = locNodeId;
+ this.localNode = localNode;
this.originatingNodeId = originatingNodeId;
this.tx = tx;
@@ -219,10 +221,10 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
}
/**
- * Get local node ID.
+ * Get local node.
*/
- public String localNodeId() {
- return locNodeId;
+ public ClusterNode localNode() {
+ return localNode;
}
/** {@inheritDoc} */
@@ -348,6 +350,11 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data
return tx;
}
+ /** Read only transaction time. */
+ public HybridTimestamp transactionTime() {
+ return qctx.transactionTime();
+ }
+
/**
* Sets cancel flag, returns {@code true} if flag was changed by this call.
*
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ab71eab935..e1cd92462c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.configuration.ConfigurationChangeException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -91,7 +92,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
private final MessageService msgSrvc;
- private final String locNodeId;
+ private final ClusterNode localNode;
private final SqlSchemaManager sqlSchemaManager;
@@ -138,7 +139,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
DataStorageManager dataStorageManager
) {
return new ExecutionServiceImpl<>(
- topSrvc.localMember().id(),
+ topSrvc.localMember(),
msgSrvc,
new MappingServiceImpl(topSrvc),
sqlSchemaManager,
@@ -154,7 +155,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
* Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
ExecutionServiceImpl(
- String localNodeId,
+ ClusterNode localNode,
MessageService msgSrvc,
MappingService mappingSrvc,
SqlSchemaManager sqlSchemaManager,
@@ -164,7 +165,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
ExchangeService exchangeSrvc,
ImplementorFactory<RowT> implementorFactory
) {
- this.locNodeId = localNodeId;
+ this.localNode = localNode;
this.handler = handler;
this.msgSrvc = msgSrvc;
this.mappingSrvc = mappingSrvc;
@@ -201,7 +202,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
return queryManager.execute(plan);
}
- private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params) {
+ private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params, HybridTimestamp txTime) {
return BaseQueryContext.builder()
.queryId(queryId)
.parameters(params)
@@ -211,6 +212,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
.build()
)
.logger(LOG)
+ .transactionTime(txTime)
.build();
}
@@ -291,7 +293,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
assert nodeId != null && msg != null;
DistributedQueryManager queryManager = queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
- BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters());
+ BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters(), msg.txTime());
return new DistributedQueryManager(ctx);
});
@@ -382,15 +384,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
private volatile Long rootFragmentId = null;
- private @Nullable InternalTransaction transaction;
+ private @Nullable InternalTransaction tx;
private DistributedQueryManager(
BaseQueryContext ctx,
- @Nullable InternalTransaction transaction
+ @Nullable InternalTransaction tx
) {
this(ctx);
- this.transaction = transaction;
+ this.tx = tx;
}
private DistributedQueryManager(BaseQueryContext ctx) {
@@ -419,6 +421,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
.root(fragment.serialized())
.fragmentDescription(desc)
.parameters(ctx.parameters())
+ .txTime(ctx.transactionTime())
.build();
var fut = new CompletableFuture<Void>();
@@ -520,12 +523,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
ctx,
taskExecutor,
ctx.queryId(),
- locNodeId,
+ localNode,
initiatorNodeId,
desc,
handler,
Commons.parametersMap(ctx.parameters()),
- transaction
+ tx
);
}
@@ -558,7 +561,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
private AsyncCursor<List<Object>> execute(MultiStepPlan plan) {
taskExecutor.execute(() -> {
- plan.init(mappingSrvc, new MappingQueryContext(locNodeId));
+ plan.init(mappingSrvc, new MappingQueryContext(localNode.id()));
List<Fragment> fragments = plan.fragments();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index dc9208d9a7..5d31e63046 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -188,7 +188,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
IgniteDistribution distr = rel.distribution();
Destination<RowT> dest = distr.destination(ctx, affSrvc, ctx.group(rel.sourceId()));
- String localNodeId = ctx.localNodeId();
+ String localNodeId = ctx.localNode().id();
FilterNode<RowT> node = new FilterNode<>(ctx, rel.getRowType(), r -> Objects.equals(localNodeId, first(dest.targets(r))));
@@ -303,7 +303,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
IgniteIndex idx = tbl.getIndex(rel.indexName());
ColocationGroup group = ctx.group(rel.sourceId());
- int[] parts = group.partitions(ctx.localNodeId());
+ int[] parts = group.partitions(ctx.localNode().id());
return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, ranges, filters, prj, requiredColumns.toBitSet());
}
@@ -328,7 +328,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
ColocationGroup group = ctx.group(rel.sourceId());
- if (!group.nodeIds().contains(ctx.localNodeId())) {
+ if (!group.nodeIds().contains(ctx.localNode().id())) {
return new ScanNode<>(ctx, rowType, Collections.emptyList());
}
@@ -336,7 +336,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
ctx,
rowType,
tbl,
- group.partitions(ctx.localNodeId()),
+ group.partitions(ctx.localNode().id()),
filters,
prj,
requiredColumns
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 8dcd4f3fb4..d902e71c7c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -95,6 +95,8 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
assert !nullOrEmpty(parts);
+ assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized.";
+
this.physTable = schemaTable.table();
this.schemaTable = schemaTable;
this.parts = parts;
@@ -218,7 +220,11 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
if (subscription != null) {
subscription.request(waiting);
} else if (curPartIdx < parts.length) {
- physTable.scan(parts[curPartIdx++], context().transaction()).subscribe(new SubscriberImpl());
+ if (context().transactionTime() != null) {
+ physTable.scan(parts[curPartIdx++], context().transactionTime(), context().localNode()).subscribe(new SubscriberImpl());
+ } else {
+ physTable.scan(parts[curPartIdx++], context().transaction()).subscribe(new SubscriberImpl());
+ }
} else {
waiting = NOT_WAITING;
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index 85be0a59d2..73d85c97a7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.sql.engine.message;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
/**
* QueryStartRequest interface.
@@ -47,4 +49,10 @@ public interface QueryStartRequest extends ExecutionContextAwareMessage {
*/
@Marshallable
Object[] parameters();
+
+ /**
+ * Read only transaction time or null if this is read write transaction.
+ */
+ @Marshallable
+ @Nullable HybridTimestamp txTime();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
index 674ffa29a0..abd28b6965 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
@@ -82,7 +82,7 @@ public class FragmentDescription implements Serializable {
}
/**
- * Get mappring.
+ * Get mapping.
*/
public FragmentMapping mapping() {
return mapping;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 70a956d561..7f79156fae 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.prepare;
import static org.apache.ignite.internal.sql.engine.prepare.PlannerHelper.optimize;
+import static org.apache.ignite.internal.sql.engine.trait.TraitUtils.distributionPresent;
import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.USUPPORTED_SQL_OPERATION_KIND_ERR;
@@ -215,7 +216,9 @@ public class PrepareServiceImpl implements PrepareService, SchemaUpdateListener
}
private CompletableFuture<QueryPlan> prepareQuery(SqlNode sqlNode, PlanningContext ctx) {
- var key = new CacheKey(ctx.schemaName(), sqlNode.toString());
+ boolean distributed = distributionPresent(ctx.config().getTraitDefs());
+
+ var key = new CacheKey(ctx.schemaName(), sqlNode.toString(), distributed);
var planFut = cache.computeIfAbsent(key, k -> CompletableFuture.supplyAsync(() -> {
IgnitePlanner planner = ctx.planner();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 7311a3f413..7a0f3c43ab 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
+import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
@@ -262,6 +263,21 @@ public class TraitUtils {
return traits.getTrait(DistributionTraitDef.INSTANCE);
}
+ /**
+ * Check distribution definition in traits.
+ *
+ * @param traitDefs Traits to analyze.
+ * @return {@code true} if distribution found, {@code false} otherwise.
+ */
+ public static boolean distributionPresent(ImmutableList<RelTraitDef> traitDefs) {
+ for (RelTraitDef<?> trait : traitDefs) {
+ if (trait instanceof DistributionTraitDef) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Collation. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 2cd28e9225..2327fa28e2 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -49,6 +49,7 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryCancel;
@@ -161,6 +162,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private final InternalTransaction tx;
+ private final HybridTimestamp txTs;
+
private CalciteCatalogReader catalogReader;
private long plannerTimeout;
@@ -175,6 +178,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
Object[] parameters,
IgniteLogger log,
InternalTransaction tx,
+ HybridTimestamp txTs,
long plannerTimeout
) {
super(Contexts.chain(cfg.getContext()));
@@ -187,6 +191,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
this.cancel = cancel;
this.parameters = parameters;
this.tx = tx;
+ this.txTs = txTs;
this.plannerTimeout = plannerTimeout;
RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem());
@@ -240,6 +245,10 @@ public final class BaseQueryContext extends AbstractQueryContext {
return tx;
}
+ public HybridTimestamp transactionTime() {
+ return txTs;
+ }
+
public long plannerTimeout() {
return plannerTimeout;
}
@@ -281,7 +290,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
.logger(log)
.cancel(cancel)
.parameters(parameters)
- .transaction(tx);
+ .transaction(tx)
+ .transactionTime(txTs);
}
/**
@@ -313,6 +323,8 @@ public final class BaseQueryContext extends AbstractQueryContext {
private InternalTransaction tx;
+ private HybridTimestamp txTs;
+
private long plannerTimeout;
public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
@@ -345,13 +357,18 @@ public final class BaseQueryContext extends AbstractQueryContext {
return this;
}
+ public Builder transactionTime(HybridTimestamp txTs) {
+ this.txTs = txTs;
+ return this;
+ }
+
public Builder plannerTimeout(long plannerTimeout) {
this.plannerTimeout = plannerTimeout;
return this;
}
public BaseQueryContext build() {
- return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, tx, plannerTimeout);
+ return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, tx, txTs, plannerTimeout);
}
}
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index e2d560e357..f3628ad01e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -219,6 +220,7 @@ public class StopCalciteModuleTest {
when(tbl.tableId()).thenReturn(UUID.randomUUID());
+ when(txManager.begin()).thenReturn(mock(InternalTransaction.class));
when(tbl.storage()).thenReturn(mock(MvTableStorage.class));
when(tbl.storage().configuration()).thenReturn(mock(TableConfiguration.class));
when(tbl.storage().configuration().partitions()).thenReturn(mock(ConfigurationValue.class));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 114338f3d2..f9627646d5 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -81,6 +81,8 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -342,7 +344,7 @@ public class ExecutionServiceImplTest {
var mailboxRegistry = new MailboxRegistryImpl();
var exchangeService = new ExchangeServiceImpl(
- nodeId,
+ new ClusterNode(nodeId, "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
taskExecutor,
mailboxRegistry,
messageService
@@ -353,7 +355,7 @@ public class ExecutionServiceImplTest {
when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
var executionService = new ExecutionServiceImpl<>(
- nodeId,
+ new ClusterNode(nodeId, "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
messageService,
(single, filter) -> single ? List.of(nodeIds.get(ThreadLocalRandom.current().nextInt(nodeIds.size()))) : nodeIds,
schemaManagerMock,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index e743e60eb1..af6d1daf2b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
/**
@@ -111,7 +113,7 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest {
.build(),
null,
UUID.randomUUID(),
- "fake-test-node",
+ new ClusterNode("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
"fake-test-node",
null,
ArrayRowHandler.INSTANCE,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 7fb9ce6b6e..efa9e13d33 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
+import static org.mockito.Mockito.mock;
+
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.ArrayDeque;
import java.util.Deque;
@@ -45,7 +47,10 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -103,12 +108,12 @@ public class AbstractExecutionTest extends IgniteAbstractTest {
.build(),
taskExecutor,
UUID.randomUUID(),
- "fake-test-node",
+ new ClusterNode("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")),
"fake-test-node",
fragmentDesc,
ArrayRowHandler.INSTANCE,
Map.of(),
- null
+ mock(InternalTransaction.class)
);
}