You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/01/05 22:29:32 UTC
[ignite-3] branch main updated: IGNITE-18044 Recovery collection of pending transaction on node start. (#1480)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 97542df0da IGNITE-18044 Recovery collection of pending transaction on node start. (#1480)
97542df0da is described below
commit 97542df0da63e3d1e36911bc9ca95a5052961c28
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Jan 6 02:29:26 2023 +0400
IGNITE-18044 Recovery collection of pending transaction on node start. (#1480)
---
.../ignite/internal/raft/server/RaftServer.java | 8 +
.../internal/raft/server/impl/JraftServerImpl.java | 6 +
.../ignite/raft/jraft/core/FSMCallerImpl.java | 8 +-
modules/runner/build.gradle | 1 +
.../runner/app/ItIgniteNodeRestartTest.java | 6 +-
.../ItRaftCommandLeftInLogUntilRestartTest.java | 299 +++++++++++++++++++++
.../runner/app/ItTableApiContractTest.java | 3 +-
.../sql/engine/AbstractBasicIntegrationTest.java | 60 ++++-
.../table/distributed/raft/PartitionListener.java | 16 ++
9 files changed, 390 insertions(+), 17 deletions(-)
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 3e7aa42cc4..ea1aea77ff 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.jetbrains.annotations.TestOnly;
/**
@@ -105,4 +106,11 @@ public interface RaftServer extends IgniteComponent {
*/
@TestOnly
Set<RaftNodeId> localNodes();
+
+ /**
+ * Get a raft server options.
+ *
+ * @return Raft server options.
+ */
+ NodeOptions options();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 3ef4d314f7..135a642d0f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -498,6 +498,12 @@ public class JraftServerImpl implements RaftServer {
return nodes.keySet();
}
+ /** {@inheritDoc} */
+ @Override
+ public NodeOptions options() {
+ return opts;
+ }
+
/**
* Blocks messages for raft group node according to provided predicate.
*
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index 78690fa857..bbe6bd5ccc 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -72,7 +72,7 @@ public class FSMCallerImpl implements FSMCaller {
/**
* Task type
*/
- private enum TaskType {
+ public enum TaskType {
IDLE, //
COMMITTED, //
SNAPSHOT_SAVE, //
@@ -102,14 +102,14 @@ public class FSMCallerImpl implements FSMCaller {
/** Raft node id. */
NodeId nodeId;
- TaskType type;
+ public TaskType type;
// union fields
- long committedIndex;
+ public long committedIndex;
long term;
Status status;
LeaderChangeContext leaderChangeCtx;
Closure done;
- CountDownLatch shutdownLatch;
+ public CountDownLatch shutdownLatch;
@Override
public NodeId nodeId() {
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index ca19cb2016..48b9cd510a 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -63,6 +63,7 @@ dependencies {
implementation libs.slf4j.jdk14
implementation libs.typesafe.config
implementation libs.auto.service.annotations
+ implementation libs.disruptor
testAnnotationProcessor project(':ignite-configuration-annotation-processor')
testAnnotationProcessor libs.auto.service
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index dbce306306..cb52ced0c7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -704,7 +704,6 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* Starts two nodes and checks that the data are storing through restarts. Nodes restart in the same order when they started at first.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18044")
public void testTwoNodesRestartDirect() {
twoNodesRestart(true);
}
@@ -713,7 +712,6 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* Starts two nodes and checks that the data are storing through restarts. Nodes restart in reverse order when they started at first.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18044")
public void testTwoNodesRestartReverse() {
twoNodesRestart(false);
}
@@ -1039,11 +1037,13 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
- private static void createTableWithoutData(Ignite ignite, String name, int replicas, int partitions) {
+ private static Table createTableWithoutData(Ignite ignite, String name, int replicas, int partitions) {
try (Session session = ignite.sql().createSession()) {
session.execute(null, "CREATE TABLE " + name
+ "(id INT PRIMARY KEY, name VARCHAR) WITH replicas=" + replicas + ", partitions=" + partitions);
}
+
+ return ignite.tables().table(name);
}
/**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
new file mode 100644
index 0000000000..dd208ea7f2
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl.TaskType;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
+import org.apache.ignite.raft.jraft.entity.NodeId;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The class has tests of cluster recovery when no all committed RAFT commands applied to the state machine.
+ */
+public class ItRaftCommandLeftInLogUntilRestartTest extends AbstractBasicIntegrationTest {
+
+ private final Object[][] dataSet = {
+ {1, "Igor", 10d},
+ {2, null, 15d},
+ {3, "Ilya", 15d},
+ {4, "Roma", 10d}
+ };
+
+ @Override
+ protected int nodes() {
+ return 2;
+ }
+
+ /**
+ * Tests recovery of transaction commit from RAFT log on restart.
+ *
+ * @throws Exception If fail.
+ */
+ @Test
+ public void testCommitCommand() throws Exception {
+ restartClusterWithNotAppliedCommands(
+ tx -> insertDataInTransaction(tx, "person", List.of("ID", "NAME", "SALARY"), dataSet),
+ tx -> {
+ },
+ ignite -> checkData(ignite, dataSet)
+ );
+ }
+
+ /**
+ * Tests recovery of transaction Update All operation from RAFT log on restart.
+ *
+ * @throws Exception If fail.
+ */
+ @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart , "
+ + "but the table have not started yet.")
+ public void testUpdateAllCommand() throws Exception {
+ restartClusterWithNotAppliedCommands(
+ tx -> {
+ },
+ tx -> insertDataInTransaction(tx, "person", List.of("ID", "NAME", "SALARY"), dataSet),
+ ignite -> checkData(ignite, dataSet)
+ );
+ }
+
+ /**
+ * Tests recovery of transaction commit from RAFT log on restart using key value view.
+ *
+ * @throws Exception If fail.
+ */
+ @Test
+ public void testCommitCommandKeyValueView() throws Exception {
+ restartClusterWithNotAppliedCommands(
+ tx -> {
+ var kvView = CLUSTER_NODES.get(0).tables().table(DEFAULT_TABLE_NAME).keyValueView();
+
+ for (var row : dataSet) {
+ kvView.put(tx, Tuple.create().set("ID", row[0]), Tuple.create().set("NAME", row[1]).set("SALARY", row[2]));
+ }
+ },
+ tx -> {
+ },
+ ignite -> checkData(ignite, dataSet)
+ );
+ }
+
+ /**
+ * Tests recovery of transaction Update All operation from RAFT log on restart using key value view.
+ *
+ * @throws Exception If fail.
+ */
+ @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart , "
+ + "but the table have not started yet.")
+ public void testUpdateCommandKeyValueView() throws Exception {
+ restartClusterWithNotAppliedCommands(
+ tx -> {
+ },
+ tx -> {
+ var kvView = CLUSTER_NODES.get(0).tables().table(DEFAULT_TABLE_NAME).keyValueView();
+
+ for (var row : dataSet) {
+ kvView.put(tx, Tuple.create().set("ID", row[0]), Tuple.create().set("NAME", row[1]).set("SALARY", row[2]));
+ }
+ },
+ ignite -> checkData(ignite, dataSet)
+ );
+ }
+
+ /**
+ * Restarts a test cluster, where part RAFT command will be applied from log on the second start.
+ *
+ * @param beforeBlock An action which is applied on all nodes in the test cluster.
+ * @param afterBlock An action which is applied on leader and written in log on follower.
+ * @param checkAction An action to check data after restart.
+ * @throws Exception If fail.
+ */
+ public void restartClusterWithNotAppliedCommands(
+ Consumer<Transaction> beforeBlock,
+ Consumer<Transaction> afterBlock,
+ Consumer<IgniteImpl> checkAction
+ ) throws Exception {
+ var node0 = (IgniteImpl) CLUSTER_NODES.get(0);
+ var node1 = (IgniteImpl) CLUSTER_NODES.get(1);
+
+ AtomicReference<IgniteBiTuple<ClusterNode, String>> leaderAndGroupRef = new AtomicReference<>();
+
+ var appliedIndexNode0 = partitionUpdateInhibitor(node0, leaderAndGroupRef);
+ var appliedIndexNode1 = partitionUpdateInhibitor(node1, leaderAndGroupRef);
+
+ TableImpl table = (TableImpl) createTable(DEFAULT_TABLE_NAME, 2, 1);
+
+ ClusterNode leader = table.internalTable().leaderAssignment(0);
+
+ boolean isNode0Leader = node0.id().equals(leader.id());
+
+ BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 42));
+
+ if (isNode0Leader) {
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(), node1.node()).get());
+ } else {
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(), node0.node()).get());
+ }
+
+ var tx = node0.transactions().begin();
+
+ try {
+ beforeBlock.accept(tx);
+
+ assertTrue(IgniteTestUtils.waitForCondition(() -> appliedIndexNode0.get() == appliedIndexNode1.get(), 10_000));
+
+ leaderAndGroupRef.set(new IgniteBiTuple<>(leader, table.tableId() + "_part_0"));
+
+ afterBlock.accept(tx);
+
+ tx.commit();
+ } finally {
+ //TODO: IGNITE-18324 Nothing do in the rollback invocation when a transaction is committed.
+ //tx.rollback();
+ }
+
+ stopNodes();
+ startCluster();
+
+ var node0Started = (IgniteImpl) CLUSTER_NODES.get(0);
+ var node1Started = (IgniteImpl) CLUSTER_NODES.get(1);
+
+ var ignite = isNode0Leader ? node1Started : node0Started;
+
+ checkAction.accept(ignite);
+
+ clearData(ignite.tables().table(DEFAULT_TABLE_NAME));
+ }
+
+ /**
+ * Inhibits updates on follower node after leader and group name are assigned.
+ *
+ * @param node Node which storage updates will be inhibited.
+ * @param leaderAndGroupRef Pair contains of leader and RAFT group name.
+ * @return Atomic long that represents an applied index.
+ */
+ private static AtomicLong partitionUpdateInhibitor(
+ IgniteImpl node,
+ AtomicReference<IgniteBiTuple<ClusterNode, String>> leaderAndGroupRef
+ ) {
+ AtomicLong appliedIndex = new AtomicLong();
+
+ var nodeOptions = node.raftManager().server().options();
+
+ nodeOptions.setfSMCallerExecutorDisruptor(new StripedDisruptor<>(
+ NamedThreadFactory.threadPrefix(node.name() + "-test", "JRaft-FSMCaller-Disruptor"),
+ 64,
+ () -> new ApplyTask(),
+ 1
+ ) {
+ @Override
+ public RingBuffer<ApplyTask> subscribe(NodeId group, EventHandler<ApplyTask> handler,
+ BiConsumer<ApplyTask, Throwable> exceptionHandler) {
+ return super.subscribe(group, (event, sequence, endOfBatch) -> {
+ if (leaderAndGroupRef.get() != null
+ && event.nodeId().getGroupId().equals(leaderAndGroupRef.get().get2())
+ && !node.node().equals(leaderAndGroupRef.get().get1())) {
+ log.info("Event for RAFT [grp={}, type={}, idx={}]", event.nodeId().getGroupId(), event.type, event.committedIndex);
+
+ if (event.type == TaskType.SHUTDOWN) {
+ event.shutdownLatch.countDown();
+ }
+
+ return;
+ }
+
+ handler.onEvent(event, sequence, endOfBatch);
+
+ appliedIndex.set(event.committedIndex);
+ }, exceptionHandler);
+ }
+ });
+
+ return appliedIndex;
+ }
+
+ private void checkData(IgniteImpl ignite, Object[][] dataSet) {
+ TableImpl table = (TableImpl) ignite.tables().table(DEFAULT_TABLE_NAME);
+
+ assertNotNull(table);
+
+ for (Object[] row : dataSet) {
+ try {
+ Tuple txTuple = table.keyValueView().get(null, Tuple.create().set("ID", row[0]));
+
+ assertNotNull(txTuple);
+
+ assertEquals(row[1], txTuple.value("NAME"));
+ assertEquals(row[2], txTuple.value("SALARY"));
+
+ BinaryRowEx testKey = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("ID", row[0]));
+
+ BinaryRow readOnlyRow = table.internalTable().get(testKey, new HybridClockImpl().now(), ignite.node()).get();
+
+ //TODO: IGNITE-18497 Readonly check is possible only when the readonly read will be fixed.
+ //assertNotNull(readOnlyRow);
+ //assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2));
+ //assertEquals(row[2], new Row(table.schemaView().schema(), readOnlyRow).doubleValue(1));
+ } catch (Exception e) {
+ new RuntimeException(IgniteStringFormatter.format("Cannot check a row {}", row), e);
+ }
+ }
+ }
+
+ /**
+ * Clears data with primary keys for 0 to 100.
+ *
+ * @param table Ignite table.
+ */
+ private static void clearData(Table table) {
+ ArrayList<Tuple> keysToRemove = new ArrayList<>(100);
+
+ IntStream.range(0, 100).forEach(rowId -> keysToRemove.add(Tuple.create().set("ID", rowId)));
+
+ table.keyValueView().removeAll(null, keysToRemove);
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index 88a242487c..a746ab279b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -36,7 +36,6 @@ import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
/**
* There are tests which check a table managment contract.
@@ -65,7 +64,7 @@ public class ItTableApiContractTest extends AbstractBasicIntegrationTest {
* Before all tests.
*/
@BeforeAll
- static void beforeAll(TestInfo testInfo) throws Exception {
+ static void beforeAll() {
ignite = CLUSTER_NODES.get(0);
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
index 58201db721..6696801757 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
@@ -78,6 +78,9 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
/** Timeout should be big enough to prevent premature session expiration. */
private static final long SESSION_IDLE_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
+ /** Test default table name. */
+ protected static final String DEFAULT_TABLE_NAME = "person";
+
/** Base port number. */
private static final int BASE_PORT = 3344;
@@ -98,13 +101,29 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
@WorkDirectory
private static Path WORK_DIR;
+ /** Information object that is initialised on the test startup. */
+ private TestInfo testInfo;
+
/**
* Before all.
*
* @param testInfo Test information object.
*/
@BeforeAll
- void startNodes(TestInfo testInfo) {
+ void beforeAll(TestInfo testInfo) {
+ LOG.info("Start beforeAll()");
+
+ this.testInfo = testInfo;
+
+ startCluster();
+
+ LOG.info("End beforeAll()");
+ }
+
+ /**
+ * Starts and initializes a test cluster.
+ */
+ protected void startCluster() {
String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
List<CompletableFuture<Ignite>> futures = new ArrayList<>();
@@ -141,9 +160,18 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
* After all.
*/
@AfterAll
- void stopNodes(TestInfo testInfo) throws Exception {
- LOG.info("Start tearDown()");
+ void afterAll() throws Exception {
+ LOG.info("Start afterAll()");
+ stopNodes();
+
+ LOG.info("End afterAll()");
+ }
+
+ /**
+ * Stops all started nodes.
+ */
+ protected void stopNodes() throws Exception {
List<AutoCloseable> closeables = IntStream.range(0, nodes())
.mapToObj(i -> testNodeName(testInfo, i))
.map(nodeName -> (AutoCloseable) () -> IgnitionManager.stop(nodeName))
@@ -152,8 +180,6 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
IgniteUtils.closeAll(closeables);
CLUSTER_NODES.clear();
-
- LOG.info("End tearDown()");
}
/** Drops all visible tables. */
@@ -223,6 +249,20 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
.collect(Collectors.joining("','", "/*+ DISABLE_RULE('", "') */"))));
}
+ /**
+ * Creates a table.
+ *
+ * @param name Table name.
+ * @param replicas Replica factor.
+ * @param partitions Partitions count.
+ */
+ protected static Table createTable(String name, int replicas, int partitions) {
+ sql(IgniteStringFormatter.format("CREATE TABLE IF NOT EXISTS {} (id INT PRIMARY KEY, name VARCHAR, salary DOUBLE) "
+ + "WITH replicas={}, partitions={}", name, replicas, partitions));
+
+ return CLUSTER_NODES.get(0).tables().table(name);
+ }
+
enum JoinType {
NESTED_LOOP(
"CorrelatedNestedLoopJoin",
@@ -250,7 +290,7 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
}
protected static void createAndPopulateTable() {
- sql("CREATE TABLE person (id INT PRIMARY KEY, name VARCHAR, salary DOUBLE)");
+ createTable(DEFAULT_TABLE_NAME, 1, 8);
int idx = 0;
@@ -270,14 +310,18 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
protected static void insertData(String tblName, List<String> columnNames, Object[]... tuples) {
Transaction tx = CLUSTER_NODES.get(0).transactions().begin();
+ insertDataInTransaction(tx, tblName, columnNames, tuples);
+
+ tx.commit();
+ }
+
+ protected static void insertDataInTransaction(Transaction tx, String tblName, List<String> columnNames, Object[][] tuples) {
String insertStmt = "INSERT INTO " + tblName + "(" + String.join(", ", columnNames) + ")"
+ " VALUES (" + ", ?".repeat(columnNames.size()).substring(2) + ")";
for (Object[] args : tuples) {
sql(tx, insertStmt, args);
}
-
- tx.commit();
}
protected static void checkData(Table table, String[] columnNames, Object[]... tuples) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 47d872a56e..aa7f0c7ab3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Command;
@@ -48,7 +49,9 @@ import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
@@ -107,6 +110,19 @@ public class PartitionListener implements RaftGroupListener {
this.txManager = txManager;
this.indexes = indexes;
this.partitionId = partitionId;
+
+ // TODO: IGNITE-18502 Implement a pending update storage
+ try (PartitionTimestampCursor cursor = partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
+ if (cursor != null) {
+ while (cursor.hasNext()) {
+ ReadResult readResult = cursor.next();
+
+ if (readResult.isWriteIntent()) {
+ txsPendingRowIds.computeIfAbsent(readResult.transactionId(), key -> new HashSet()).add(readResult.rowId());
+ }
+ }
+ }
+ }
}
@Override