You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/01/05 22:29:32 UTC

[ignite-3] branch main updated: IGNITE-18044 Recovery collection of pending transaction on node start. (#1480)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 97542df0da IGNITE-18044 Recovery collection of pending transaction on node start. (#1480)
97542df0da is described below

commit 97542df0da63e3d1e36911bc9ca95a5052961c28
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Jan 6 02:29:26 2023 +0400

    IGNITE-18044 Recovery collection of pending transaction on node start. (#1480)
---
 .../ignite/internal/raft/server/RaftServer.java    |   8 +
 .../internal/raft/server/impl/JraftServerImpl.java |   6 +
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |   8 +-
 modules/runner/build.gradle                        |   1 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   6 +-
 .../ItRaftCommandLeftInLogUntilRestartTest.java    | 299 +++++++++++++++++++++
 .../runner/app/ItTableApiContractTest.java         |   3 +-
 .../sql/engine/AbstractBasicIntegrationTest.java   |  60 ++++-
 .../table/distributed/raft/PartitionListener.java  |  16 ++
 9 files changed, 390 insertions(+), 17 deletions(-)

diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 3e7aa42cc4..ea1aea77ff 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -105,4 +106,11 @@ public interface RaftServer extends IgniteComponent {
      */
     @TestOnly
     Set<RaftNodeId> localNodes();
+
+    /**
+     * Get a raft server options.
+     *
+     * @return Raft server options.
+     */
+    NodeOptions options();
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 3ef4d314f7..135a642d0f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -498,6 +498,12 @@ public class JraftServerImpl implements RaftServer {
         return nodes.keySet();
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public NodeOptions options() {
+        return opts;
+    }
+
     /**
      * Blocks messages for raft group node according to provided predicate.
      *
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index 78690fa857..bbe6bd5ccc 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -72,7 +72,7 @@ public class FSMCallerImpl implements FSMCaller {
     /**
      * Task type
      */
-    private enum TaskType {
+    public enum TaskType {
         IDLE, //
         COMMITTED, //
         SNAPSHOT_SAVE, //
@@ -102,14 +102,14 @@ public class FSMCallerImpl implements FSMCaller {
         /** Raft node id. */
         NodeId nodeId;
 
-        TaskType type;
+        public TaskType type;
         // union fields
-        long committedIndex;
+        public long committedIndex;
         long term;
         Status status;
         LeaderChangeContext leaderChangeCtx;
         Closure done;
-        CountDownLatch shutdownLatch;
+        public CountDownLatch shutdownLatch;
 
         @Override
         public NodeId nodeId() {
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index ca19cb2016..48b9cd510a 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -63,6 +63,7 @@ dependencies {
     implementation libs.slf4j.jdk14
     implementation libs.typesafe.config
     implementation libs.auto.service.annotations
+    implementation libs.disruptor
 
     testAnnotationProcessor project(':ignite-configuration-annotation-processor')
     testAnnotationProcessor libs.auto.service
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index dbce306306..cb52ced0c7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -704,7 +704,6 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * Starts two nodes and checks that the data are storing through restarts. Nodes restart in the same order when they started at first.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18044")
     public void testTwoNodesRestartDirect() {
         twoNodesRestart(true);
     }
@@ -713,7 +712,6 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * Starts two nodes and checks that the data are storing through restarts. Nodes restart in reverse order when they started at first.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18044")
     public void testTwoNodesRestartReverse() {
         twoNodesRestart(false);
     }
@@ -1039,11 +1037,13 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
      * @param replicas Replica factor.
      * @param partitions Partitions count.
      */
-    private static void createTableWithoutData(Ignite ignite, String name, int replicas, int partitions) {
+    private static Table createTableWithoutData(Ignite ignite, String name, int replicas, int partitions) {
         try (Session session = ignite.sql().createSession()) {
             session.execute(null, "CREATE TABLE " + name
                     + "(id INT PRIMARY KEY, name VARCHAR) WITH replicas=" + replicas + ", partitions=" + partitions);
         }
+
+        return ignite.tables().table(name);
     }
 
     /**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
new file mode 100644
index 0000000000..dd208ea7f2
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl.TaskType;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
+import org.apache.ignite.raft.jraft.entity.NodeId;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The class has tests of cluster recovery when no all committed RAFT commands applied to the state machine.
+ */
+public class ItRaftCommandLeftInLogUntilRestartTest extends AbstractBasicIntegrationTest {
+
+    private final Object[][] dataSet = {
+            {1, "Igor", 10d},
+            {2, null, 15d},
+            {3, "Ilya", 15d},
+            {4, "Roma", 10d}
+    };
+
+    @Override
+    protected int nodes() {
+        return 2;
+    }
+
+    /**
+     * Tests recovery of transaction commit from RAFT log on restart.
+     *
+     * @throws Exception If fail.
+     */
+    @Test
+    public void testCommitCommand() throws Exception {
+        restartClusterWithNotAppliedCommands(
+                tx -> insertDataInTransaction(tx, "person", List.of("ID", "NAME", "SALARY"), dataSet),
+                tx -> {
+                },
+                ignite -> checkData(ignite, dataSet)
+        );
+    }
+
+    /**
+     * Tests recovery of transaction Update All operation from RAFT log on restart.
+     *
+     * @throws Exception If fail.
+     */
+    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart , "
+            + "but the table have not started yet.")
+    public void testUpdateAllCommand() throws Exception {
+        restartClusterWithNotAppliedCommands(
+                tx -> {
+                },
+                tx -> insertDataInTransaction(tx, "person", List.of("ID", "NAME", "SALARY"), dataSet),
+                ignite -> checkData(ignite, dataSet)
+        );
+    }
+
+    /**
+     * Tests recovery of transaction commit from RAFT log on restart using key value view.
+     *
+     * @throws Exception If fail.
+     */
+    @Test
+    public void testCommitCommandKeyValueView() throws Exception {
+        restartClusterWithNotAppliedCommands(
+                tx -> {
+                    var kvView = CLUSTER_NODES.get(0).tables().table(DEFAULT_TABLE_NAME).keyValueView();
+
+                    for (var row : dataSet) {
+                        kvView.put(tx, Tuple.create().set("ID", row[0]), Tuple.create().set("NAME", row[1]).set("SALARY", row[2]));
+                    }
+                },
+                tx -> {
+                },
+                ignite -> checkData(ignite, dataSet)
+        );
+    }
+
+    /**
+     * Tests recovery of transaction Update All operation from RAFT log on restart using key value view.
+     *
+     * @throws Exception If fail.
+     */
+    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart , "
+            + "but the table have not started yet.")
+    public void testUpdateCommandKeyValueView() throws Exception {
+        restartClusterWithNotAppliedCommands(
+                tx -> {
+                },
+                tx -> {
+                    var kvView = CLUSTER_NODES.get(0).tables().table(DEFAULT_TABLE_NAME).keyValueView();
+
+                    for (var row : dataSet) {
+                        kvView.put(tx, Tuple.create().set("ID", row[0]), Tuple.create().set("NAME", row[1]).set("SALARY", row[2]));
+                    }
+                },
+                ignite -> checkData(ignite, dataSet)
+        );
+    }
+
+    /**
+     * Restarts a test cluster, where part RAFT command will be applied from log on the second start.
+     *
+     * @param beforeBlock An action which is applied on all nodes in the test cluster.
+     * @param afterBlock An action which is applied on leader and written in log on follower.
+     * @param checkAction An action to check data after restart.
+     * @throws Exception If fail.
+     */
+    public void restartClusterWithNotAppliedCommands(
+            Consumer<Transaction> beforeBlock,
+            Consumer<Transaction> afterBlock,
+            Consumer<IgniteImpl> checkAction
+    ) throws Exception {
+        var node0 = (IgniteImpl) CLUSTER_NODES.get(0);
+        var node1 = (IgniteImpl) CLUSTER_NODES.get(1);
+
+        AtomicReference<IgniteBiTuple<ClusterNode, String>> leaderAndGroupRef = new AtomicReference<>();
+
+        var appliedIndexNode0 = partitionUpdateInhibitor(node0, leaderAndGroupRef);
+        var appliedIndexNode1 = partitionUpdateInhibitor(node1, leaderAndGroupRef);
+
+        TableImpl table = (TableImpl) createTable(DEFAULT_TABLE_NAME, 2, 1);
+
+        ClusterNode leader = table.internalTable().leaderAssignment(0);
+
+        boolean isNode0Leader = node0.id().equals(leader.id());
+
+        BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 42));
+
+        if (isNode0Leader) {
+            assertNull(table.internalTable().get(key, new HybridClockImpl().now(), node1.node()).get());
+        } else {
+            assertNull(table.internalTable().get(key, new HybridClockImpl().now(), node0.node()).get());
+        }
+
+        var tx = node0.transactions().begin();
+
+        try {
+            beforeBlock.accept(tx);
+
+            assertTrue(IgniteTestUtils.waitForCondition(() -> appliedIndexNode0.get() == appliedIndexNode1.get(), 10_000));
+
+            leaderAndGroupRef.set(new IgniteBiTuple<>(leader, table.tableId() + "_part_0"));
+
+            afterBlock.accept(tx);
+
+            tx.commit();
+        } finally {
+            //TODO: IGNITE-18324 Nothing do in the rollback invocation when a transaction is committed.
+            //tx.rollback();
+        }
+
+        stopNodes();
+        startCluster();
+
+        var node0Started = (IgniteImpl) CLUSTER_NODES.get(0);
+        var node1Started = (IgniteImpl) CLUSTER_NODES.get(1);
+
+        var ignite = isNode0Leader ? node1Started : node0Started;
+
+        checkAction.accept(ignite);
+
+        clearData(ignite.tables().table(DEFAULT_TABLE_NAME));
+    }
+
+    /**
+     * Inhibits updates on follower node after leader and group name are assigned.
+     *
+     * @param node Node which storage updates will be inhibited.
+     * @param leaderAndGroupRef Pair contains of leader and RAFT group name.
+     * @return Atomic long that represents an applied index.
+     */
+    private static AtomicLong partitionUpdateInhibitor(
+            IgniteImpl node,
+            AtomicReference<IgniteBiTuple<ClusterNode, String>> leaderAndGroupRef
+    ) {
+        AtomicLong appliedIndex = new AtomicLong();
+
+        var nodeOptions = node.raftManager().server().options();
+
+        nodeOptions.setfSMCallerExecutorDisruptor(new StripedDisruptor<>(
+                NamedThreadFactory.threadPrefix(node.name() + "-test", "JRaft-FSMCaller-Disruptor"),
+                64,
+                () -> new ApplyTask(),
+                1
+        ) {
+            @Override
+            public RingBuffer<ApplyTask> subscribe(NodeId group, EventHandler<ApplyTask> handler,
+                    BiConsumer<ApplyTask, Throwable> exceptionHandler) {
+                return super.subscribe(group, (event, sequence, endOfBatch) -> {
+                    if (leaderAndGroupRef.get() != null
+                            && event.nodeId().getGroupId().equals(leaderAndGroupRef.get().get2())
+                            && !node.node().equals(leaderAndGroupRef.get().get1())) {
+                        log.info("Event for RAFT [grp={}, type={}, idx={}]", event.nodeId().getGroupId(), event.type, event.committedIndex);
+
+                        if (event.type == TaskType.SHUTDOWN) {
+                            event.shutdownLatch.countDown();
+                        }
+
+                        return;
+                    }
+
+                    handler.onEvent(event, sequence, endOfBatch);
+
+                    appliedIndex.set(event.committedIndex);
+                }, exceptionHandler);
+            }
+        });
+
+        return appliedIndex;
+    }
+
+    private void checkData(IgniteImpl ignite, Object[][] dataSet) {
+        TableImpl table = (TableImpl) ignite.tables().table(DEFAULT_TABLE_NAME);
+
+        assertNotNull(table);
+
+        for (Object[] row : dataSet) {
+            try {
+                Tuple txTuple = table.keyValueView().get(null, Tuple.create().set("ID", row[0]));
+
+                assertNotNull(txTuple);
+
+                assertEquals(row[1], txTuple.value("NAME"));
+                assertEquals(row[2], txTuple.value("SALARY"));
+
+                BinaryRowEx testKey = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("ID", row[0]));
+
+                BinaryRow readOnlyRow = table.internalTable().get(testKey, new HybridClockImpl().now(), ignite.node()).get();
+
+                //TODO: IGNITE-18497 Readonly check is possible only when the readonly read will be fixed.
+                //assertNotNull(readOnlyRow);
+                //assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2));
+                //assertEquals(row[2], new Row(table.schemaView().schema(), readOnlyRow).doubleValue(1));
+            } catch (Exception e) {
+                new RuntimeException(IgniteStringFormatter.format("Cannot check a row {}", row), e);
+            }
+        }
+    }
+
+    /**
+     * Clears data with primary keys for 0 to 100.
+     *
+     * @param table Ignite table.
+     */
+    private static void clearData(Table table) {
+        ArrayList<Tuple> keysToRemove = new ArrayList<>(100);
+
+        IntStream.range(0, 100).forEach(rowId -> keysToRemove.add(Tuple.create().set("ID", rowId)));
+
+        table.keyValueView().removeAll(null, keysToRemove);
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index 88a242487c..a746ab279b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -36,7 +36,6 @@ import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
 
 /**
  * There are tests which check a table managment contract.
@@ -65,7 +64,7 @@ public class ItTableApiContractTest extends AbstractBasicIntegrationTest {
      * Before all tests.
      */
     @BeforeAll
-    static void beforeAll(TestInfo testInfo) throws Exception {
+    static void beforeAll() {
         ignite = CLUSTER_NODES.get(0);
     }
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
index 58201db721..6696801757 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
@@ -78,6 +78,9 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
     /** Timeout should be big enough to prevent premature session expiration. */
     private static final long SESSION_IDLE_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
 
+    /** Test default table name. */
+    protected static final String DEFAULT_TABLE_NAME = "person";
+
     /** Base port number. */
     private static final int BASE_PORT = 3344;
 
@@ -98,13 +101,29 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
     @WorkDirectory
     private static Path WORK_DIR;
 
+    /** Information object that is initialised on the test startup. */
+    private TestInfo testInfo;
+
     /**
      * Before all.
      *
      * @param testInfo Test information object.
      */
     @BeforeAll
-    void startNodes(TestInfo testInfo) {
+    void beforeAll(TestInfo testInfo) {
+        LOG.info("Start beforeAll()");
+
+        this.testInfo = testInfo;
+
+        startCluster();
+
+        LOG.info("End beforeAll()");
+    }
+
+    /**
+     * Starts and initializes a test cluster.
+     */
+    protected void startCluster() {
         String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
 
         List<CompletableFuture<Ignite>> futures = new ArrayList<>();
@@ -141,9 +160,18 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
      * After all.
      */
     @AfterAll
-    void stopNodes(TestInfo testInfo) throws Exception {
-        LOG.info("Start tearDown()");
+    void afterAll() throws Exception {
+        LOG.info("Start afterAll()");
 
+        stopNodes();
+
+        LOG.info("End afterAll()");
+    }
+
+    /**
+     * Stops all started nodes.
+     */
+    protected void stopNodes() throws Exception {
         List<AutoCloseable> closeables = IntStream.range(0, nodes())
                 .mapToObj(i -> testNodeName(testInfo, i))
                 .map(nodeName -> (AutoCloseable) () -> IgnitionManager.stop(nodeName))
@@ -152,8 +180,6 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
         IgniteUtils.closeAll(closeables);
 
         CLUSTER_NODES.clear();
-
-        LOG.info("End tearDown()");
     }
 
     /** Drops all visible tables. */
@@ -223,6 +249,20 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
                 .collect(Collectors.joining("','", "/*+ DISABLE_RULE('", "') */"))));
     }
 
+    /**
+     * Creates a table.
+     *
+     * @param name Table name.
+     * @param replicas Replica factor.
+     * @param partitions Partitions count.
+     */
+    protected static Table createTable(String name, int replicas, int partitions) {
+        sql(IgniteStringFormatter.format("CREATE TABLE IF NOT EXISTS {} (id INT PRIMARY KEY, name VARCHAR, salary DOUBLE) "
+                + "WITH replicas={}, partitions={}", name, replicas, partitions));
+
+        return CLUSTER_NODES.get(0).tables().table(name);
+    }
+
     enum JoinType {
         NESTED_LOOP(
                 "CorrelatedNestedLoopJoin",
@@ -250,7 +290,7 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
     }
 
     protected static void createAndPopulateTable() {
-        sql("CREATE TABLE person (id INT PRIMARY KEY, name VARCHAR, salary DOUBLE)");
+        createTable(DEFAULT_TABLE_NAME, 1, 8);
 
         int idx = 0;
 
@@ -270,14 +310,18 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
     protected static void insertData(String tblName, List<String> columnNames, Object[]... tuples) {
         Transaction tx = CLUSTER_NODES.get(0).transactions().begin();
 
+        insertDataInTransaction(tx, tblName, columnNames, tuples);
+
+        tx.commit();
+    }
+
+    protected static void insertDataInTransaction(Transaction tx, String tblName, List<String> columnNames, Object[][] tuples) {
         String insertStmt = "INSERT INTO " + tblName + "(" + String.join(", ", columnNames) + ")"
                 + " VALUES (" + ", ?".repeat(columnNames.size()).substring(2) + ")";
 
         for (Object[] args : tuples) {
             sql(tx, insertStmt, args);
         }
-
-        tx.commit();
     }
 
     protected static void checkData(Table table, String[] columnNames, Object[]... tuples) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 47d872a56e..aa7f0c7ab3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Command;
@@ -48,7 +49,9 @@ import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
@@ -107,6 +110,19 @@ public class PartitionListener implements RaftGroupListener {
         this.txManager = txManager;
         this.indexes = indexes;
         this.partitionId = partitionId;
+
+        // TODO: IGNITE-18502 Implement a pending update storage
+        try (PartitionTimestampCursor cursor = partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
+            if (cursor != null) {
+                while (cursor.hasNext()) {
+                    ReadResult readResult = cursor.next();
+
+                    if (readResult.isWriteIntent()) {
+                        txsPendingRowIds.computeIfAbsent(readResult.transactionId(), key -> new HashSet()).add(readResult.rowId());
+                    }
+                }
+            }
+        }
     }
 
     @Override