You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2024/02/06 14:14:32 UTC

(ignite-3) branch main updated: IGNITE-21111 Add mechanism to wait for completion of in-flight operations of RW transactions started before index appearance (#3160)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 164eb449dc IGNITE-21111 Add mechanism to wait for completion of in-flight operations of RW transactions started before index appearance (#3160)
164eb449dc is described below

commit 164eb449dcb367afe5f8dd3f227862f77afcc567
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Feb 6 17:14:26 2024 +0300

    IGNITE-21111 Add mechanism to wait for completion of in-flight operations of RW transactions started before index appearance (#3160)
---
 .../internal/index/IndexBuildController.java       |   3 +-
 .../ignite/internal/index/IndexBuildTask.java      |  25 +++-
 .../apache/ignite/internal/index/IndexBuilder.java |   7 +-
 .../index/IndexAvailabilityControllerTest.java     |  18 ++-
 .../internal/index/IndexBuildControllerTest.java   |  23 +++-
 .../ignite/internal/index/IndexBuilderTest.java    |  26 +++-
 .../ignite/internal/replicator/ReplicaService.java |   8 +-
 .../request/BuildIndexReplicaRequest.java          |   3 +
 .../replicator/PartitionReplicaListener.java       |   7 +-
 .../replication/PartitionReplicaListenerTest.java  | 143 ++++++++++++++++-----
 10 files changed, 206 insertions(+), 57 deletions(-)

diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index cb1e9c461e..436bfa67ea 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -311,7 +311,8 @@ class IndexBuildController implements ManuallyCloseable {
                 indexStorage,
                 mvPartition,
                 localNode(),
-                enlistmentConsistencyToken
+                enlistmentConsistencyToken,
+                indexDescriptor.creationCatalogVersion()
         );
     }
 
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 135af9378e..bd4c523b69 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -35,11 +35,13 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
 
@@ -69,6 +71,8 @@ class IndexBuildTask {
 
     private final long enlistmentConsistencyToken;
 
+    private final int creationCatalogVersion;
+
     private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
 
     private final AtomicBoolean taskStopGuard = new AtomicBoolean();
@@ -85,7 +89,8 @@ class IndexBuildTask {
             int batchSize,
             ClusterNode node,
             List<IndexBuildCompletionListener> listeners,
-            long enlistmentConsistencyToken
+            long enlistmentConsistencyToken,
+            int creationCatalogVersion
     ) {
         this.taskId = taskId;
         this.indexStorage = indexStorage;
@@ -98,6 +103,7 @@ class IndexBuildTask {
         // We do not intentionally make a copy of the list, we want to see changes in the passed list.
         this.listeners = listeners;
         this.enlistmentConsistencyToken = enlistmentConsistencyToken;
+        this.creationCatalogVersion = creationCatalogVersion;
     }
 
     /** Starts building the index. */
@@ -158,8 +164,15 @@ class IndexBuildTask {
             List<RowId> batchRowIds = createBatchRowIds();
 
             return replicaService.invoke(node, createBuildIndexReplicaRequest(batchRowIds))
-                    .thenComposeAsync(unused -> {
-                        if (indexStorage.getNextRowIdToBuild() == null) {
+                    .handleAsync((unused, throwable) -> {
+                        if (throwable != null) {
+                            Throwable cause = unwrapCause(throwable);
+
+                            // Read-write transaction operations have not yet completed, let's try to send the batch again.
+                            if (!(cause instanceof ReplicationTimeoutException)) {
+                                return CompletableFuture.<Void>failedFuture(cause);
+                            }
+                        } else if (indexStorage.getNextRowIdToBuild() == null) {
                             // Index has been built.
                             LOG.info("Index build completed: [{}]", createCommonIndexInfo());
 
@@ -167,11 +180,12 @@ class IndexBuildTask {
                                 listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
                             }
 
-                            return nullCompletedFuture();
+                            return CompletableFutures.<Void>nullCompletedFuture();
                         }
 
                         return handleNextBatch();
-                    }, executor);
+                    }, executor)
+                    .thenCompose(Function.identity());
         } catch (Throwable t) {
             return failedFuture(t);
         } finally {
@@ -208,6 +222,7 @@ class IndexBuildTask {
                 .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
                 .finish(finish)
                 .enlistmentConsistencyToken(enlistmentConsistencyToken)
+                .creationCatalogVersion(creationCatalogVersion)
                 .build();
     }
 
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index c5b1f61dfe..daa3f51375 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -96,6 +96,7 @@ class IndexBuilder implements ManuallyCloseable {
      * @param node Node to which requests to build the index will be sent.
      * @param enlistmentConsistencyToken Enlistment consistency token is used to check that the lease is still actual while the message goes
      *      to the replica.
+     * @param creationCatalogVersion Catalog version in which the index was created.
      */
     // TODO: IGNITE-19498 Perhaps we need to start building the index only once
     public void scheduleBuildIndex(
@@ -105,7 +106,8 @@ class IndexBuilder implements ManuallyCloseable {
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
             ClusterNode node,
-            long enlistmentConsistencyToken
+            long enlistmentConsistencyToken,
+            int creationCatalogVersion
     ) {
         inBusyLockSafe(busyLock, () -> {
             if (indexStorage.getNextRowIdToBuild() == null) {
@@ -128,7 +130,8 @@ class IndexBuilder implements ManuallyCloseable {
                     BATCH_SIZE,
                     node,
                     listeners,
-                    enlistmentConsistencyToken
+                    enlistmentConsistencyToken,
+                    creationCatalogVersion
             );
 
             IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask);
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index c316626f7f..2e36085313 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -29,6 +29,7 @@ import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NA
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
@@ -191,7 +192,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
 
         startBuildIndex(indexId);
 
-        finishBuildingIndexForPartition(indexId, 0);
+        finishBuildingIndexForPartition(indexId, 0, indexCreationCatalogVersion(INDEX_NAME));
 
         awaitTillGlobalMetastoreRevisionIsApplied();
 
@@ -262,7 +263,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
 
         int indexId = indexId(INDEX_NAME);
 
-        finishBuildingIndexForPartition(indexId, 0);
+        finishBuildingIndexForPartition(indexId, 0, indexCreationCatalogVersion(INDEX_NAME));
 
         dropIndex(INDEX_NAME);
 
@@ -282,7 +283,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
         int indexId = indexId(INDEX_NAME);
 
         for (int partitionId = 1; partitionId < partitions; partitionId++) {
-            finishBuildingIndexForPartition(indexId, partitionId);
+            finishBuildingIndexForPartition(indexId, partitionId, indexCreationCatalogVersion(INDEX_NAME));
         }
 
         dropIndex(INDEX_NAME);
@@ -303,7 +304,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
         int indexId = indexId(INDEX_NAME);
 
         for (int partitionId = 2; partitionId < partitions; partitionId++) {
-            finishBuildingIndexForPartition(indexId, partitionId);
+            finishBuildingIndexForPartition(indexId, partitionId, indexCreationCatalogVersion(INDEX_NAME));
         }
 
         dropIndex(INDEX_NAME);
@@ -354,7 +355,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
         partitions = newPartitions;
     }
 
-    private void finishBuildingIndexForPartition(int indexId, int partitionId) {
+    private void finishBuildingIndexForPartition(int indexId, int partitionId, int indexCreationCatalogVersion) {
         // It may look complicated, but the other method through mocking IndexBuilder seems messier.
         IndexStorage indexStorage = mock(IndexStorage.class);
 
@@ -369,7 +370,8 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
                 indexStorage,
                 mock(MvPartitionStorage.class),
                 mock(ClusterNode.class),
-                ANY_ENLISTMENT_CONSISTENCY_TOKEN
+                ANY_ENLISTMENT_CONSISTENCY_TOKEN,
+                indexCreationCatalogVersion
         );
 
         CompletableFuture<Void> finishBuildIndexFuture = new CompletableFuture<>();
@@ -430,4 +432,8 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
     private static String partitionBuildIndexKey(int indexId, int partitionId) {
         return "indexBuild.partition." + indexId + "." + partitionId;
     }
+
+    private int indexCreationCatalogVersion(String indexName) {
+        return getIndexStrict(catalogManager, indexName, clock.nowLong()).creationCatalogVersion();
+    }
 }
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index f7167a36c1..4d7e197b47 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -30,6 +30,7 @@ import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NA
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
 import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
 import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -128,7 +129,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 any(),
                 any(),
                 eq(LOCAL_NODE),
-                anyLong()
+                anyLong(),
+                eq(indexCreationCatalogVersion(INDEX_NAME))
         );
     }
 
@@ -149,7 +151,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 any(),
                 any(),
                 eq(LOCAL_NODE),
-                anyLong()
+                anyLong(),
+                eq(indexCreationCatalogVersion(INDEX_NAME))
         );
     }
 
@@ -168,7 +171,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 any(),
                 any(),
                 eq(LOCAL_NODE),
-                anyLong()
+                anyLong(),
+                eq(indexCreationCatalogVersion(INDEX_NAME))
         );
     }
 
@@ -183,7 +187,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 any(),
                 any(),
                 eq(LOCAL_NODE),
-                anyLong()
+                anyLong(),
+                eq(indexCreationCatalogVersion(pkIndexName(TABLE_NAME)))
         );
     }
 
@@ -202,7 +207,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 any(),
                 any(),
                 eq(LOCAL_NODE),
-                anyLong()
+                anyLong(),
+                eq(indexCreationCatalogVersion(pkIndexName(tableName)))
         );
     }
 
@@ -244,7 +250,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 any(),
                 any(),
                 eq(LOCAL_NODE),
-                anyLong()
+                anyLong(),
+                anyInt()
         );
     }
 
@@ -296,4 +303,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
                 new TablePartitionId(tableId(), PARTITION_ID)
         );
     }
+
+    private int indexCreationCatalogVersion(String indexName) {
+        return getIndexStrict(catalogManager, indexName, clock.nowLong()).creationCatalogVersion();
+    }
 }
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index eaae2356d0..a499eeec96 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
@@ -28,6 +29,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Collection;
@@ -38,10 +41,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.network.ClusterNode;
 import org.junit.jupiter.api.AfterEach;
@@ -57,6 +63,8 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest {
 
     private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 100500;
 
+    private static final int ANY_INDEX_CREATION_CATALOG_VERSION = 1;
+
     private final ReplicaService replicaService = mock(ReplicaService.class, invocation -> nullCompletedFuture());
 
     private final ExecutorService executorService = newSingleThreadExecutor();
@@ -126,6 +134,21 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest {
         assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully());
     }
 
+    @Test
+    void testIndexBuildWithReplicationTimeoutException() {
+        CompletableFuture<Void> listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID);
+
+        when(replicaService.invoke(any(ClusterNode.class), any()))
+                .thenReturn(failedFuture(new ReplicationTimeoutException(new TablePartitionId(TABLE_ID, PARTITION_ID))))
+                .thenReturn(nullCompletedFuture());
+
+        scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID)));
+
+        assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully());
+
+        verify(replicaService, times(2)).invoke(any(ClusterNode.class), any(BuildIndexReplicaRequest.class));
+    }
+
     private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
         indexBuilder.scheduleBuildIndex(
                 tableId,
@@ -134,7 +157,8 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest {
                 indexStorage(nextRowIdsToBuild),
                 mock(MvPartitionStorage.class),
                 mock(ClusterNode.class),
-                ANY_ENLISTMENT_CONSISTENCY_TOKEN
+                ANY_ENLISTMENT_CONSISTENCY_TOKEN,
+                ANY_INDEX_CREATION_CATALOG_VERSION
         );
     }
 
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 731ff50b4a..1a36116972 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -43,9 +43,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.network.ClusterNode;
 
-/**
- * The service is intended to execute requests on replicas.
- */
+/** The service is intended to execute requests on replicas. */
 public class ReplicaService {
     /** Network timeout. */
     private static final long RPC_TIMEOUT = 3000;
@@ -84,6 +82,7 @@ public class ReplicaService {
      * @return Response future with either evaluation result or completed exceptionally.
      * @see NodeStoppingException If either supplier or demander node is stopping.
      * @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+     * @see ReplicationTimeoutException If the response could not be received due to a timeout.
      */
     private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, ReplicaRequest req) {
         CompletableFuture<R> res = new CompletableFuture<>();
@@ -190,6 +189,7 @@ public class ReplicaService {
      * @return Response future with either evaluation result or completed exceptionally.
      * @see NodeStoppingException If either supplier or demander node is stopping.
      * @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+     * @see ReplicationTimeoutException If the response could not be received due to a timeout.
      */
     public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest request) {
         return sendToReplica(node.name(), request);
@@ -203,6 +203,7 @@ public class ReplicaService {
      * @return Response future with either evaluation result or completed exceptionally.
      * @see NodeStoppingException If either supplier or demander node is stopping.
      * @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+     * @see ReplicationTimeoutException If the response could not be received due to a timeout.
      */
     public <R> CompletableFuture<R> invoke(String replicaConsistentId, ReplicaRequest request) {
         return sendToReplica(replicaConsistentId, request);
@@ -217,6 +218,7 @@ public class ReplicaService {
      * @return Response future with either evaluation result or completed exceptionally.
      * @see NodeStoppingException If either supplier or demander node is stopping.
      * @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+     * @see ReplicationTimeoutException If the response could not be received due to a timeout.
      */
     public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest request, String storageId) {
         return sendToReplica(node.name(), request);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
index e737d00b2d..76a38a0e29 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
@@ -39,4 +39,7 @@ public interface BuildIndexReplicaRequest extends PrimaryReplicaRequest {
 
     /** Returns {@code true} if this batch is the last one. */
     boolean finish();
+
+    /** Return catalog version in which the index was created. */
+    int creationCatalogVersion();
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ed0d733ee8..f91b8aeb0a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -645,7 +645,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         } else if (request instanceof ReplicaSafeTimeSyncRequest) {
             return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, isPrimary);
         } else if (request instanceof BuildIndexReplicaRequest) {
-            return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
+            return processBuildIndexReplicaRequest((BuildIndexReplicaRequest) request);
         } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
             return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTsIfDirectRo);
         } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
@@ -3796,4 +3796,9 @@ public class PartitionReplicaListener implements ReplicaListener {
             busyLock.leaveBusy();
         }
     }
+
+    private CompletableFuture<?> processBuildIndexReplicaRequest(BuildIndexReplicaRequest request) {
+        return txRwOperationTracker.awaitCompleteTxRwOperations(request.creationCatalogVersion())
+                .thenCompose(unused -> raftClient.run(toBuildIndexCommand(request)));
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index cc60c1e039..b2150df118 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -139,9 +139,9 @@ import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
-import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
 import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl;
@@ -150,6 +150,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
@@ -242,6 +243,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private static final int ANOTHER_TABLE_ID = 2;
 
+    private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 1L;
+
     private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
 
     /** The storage stores partition data. */
@@ -249,9 +252,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private final LockManager lockManager = new HeapLockManager();
 
-    private final Function<PartitionCommand, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> {
+    private final Function<Command, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> {
         if (cmd instanceof WriteIntentSwitchCommand) {
-            Set<RowId> rows = pendingRows.remove(cmd.txId());
+            UUID txId = ((WriteIntentSwitchCommand) cmd).txId();
+
+            Set<RowId> rows = pendingRows.remove(txId);
 
             HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand) cmd).commitTimestamp();
             assertNotNull(commitTimestamp);
@@ -262,9 +267,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 }
             }
 
-            lockManager.releaseAll(cmd.txId());
+            lockManager.releaseAll(txId);
         } else if (cmd instanceof UpdateCommand) {
-            pendingRows.compute(cmd.txId(), (txId, v) -> {
+            UUID txId = ((UpdateCommand) cmd).txId();
+
+            pendingRows.compute(txId, (txId0, v) -> {
                 if (v == null) {
                     v = new HashSet<>();
                 }
@@ -384,7 +391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     /** Secondary hash index. */
     private TableSchemaAwareIndexStorage hashIndexStorage;
 
-    private Function<PartitionCommand, CompletableFuture<?>> raftClientFutureClosure = defaultMockRaftFutureClosure;
+    private Function<Command, CompletableFuture<?>> raftClientFutureClosure = defaultMockRaftFutureClosure;
 
     private static final AtomicInteger nextMonotonicInt = new AtomicInteger(1);
 
@@ -616,7 +623,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(grpId)
                 .txId(newTxId())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build(), "senderId");
 
         TransactionMeta txMeta = (TransactionMeta) fut.get(1, TimeUnit.SECONDS).result();
@@ -637,7 +644,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(grpId)
                 .txId(txId)
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build(), localNode.id());
 
         TransactionMeta txMeta = (TransactionMeta) fut.get(1, TimeUnit.SECONDS).result();
@@ -672,7 +679,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(grpId)
                 .txId(newTxId())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build(), localNode.id());
 
         assertThrowsWithCause(
@@ -713,7 +720,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .schemaVersion(pk.schemaVersion())
                 .primaryKey(pk.tupleSlice())
                 .requestType(RequestType.RO_GET)
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
 
         return partitionReplicaListener.invoke(request, localNode.id());
@@ -820,7 +827,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .groupId(grpId)
                         .transactionId(scanTxId)
                         .timestampLong(clock.nowLong())
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .scanId(1L)
                         .indexToUse(sortedIndexId)
                         .batchSize(4)
@@ -838,7 +845,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(scanTxId)
                 .timestampLong(clock.nowLong())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .scanId(1L)
                 .indexToUse(sortedIndexId)
                 .batchSize(4)
@@ -856,7 +863,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(newTxId())
                 .timestampLong(clock.nowLong())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
                 .lowerBoundPrefix(toIndexBound(1))
@@ -877,7 +884,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(newTxId())
                 .timestampLong(clock.nowLong())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
                 .lowerBoundPrefix(toIndexBound(5))
@@ -896,7 +903,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groupId(grpId)
                 .transactionId(newTxId())
                 .timestampLong(clock.nowLong())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
                 .exactKey(toIndexKey(0))
@@ -1220,7 +1227,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .requestType(requestType)
                         .schemaVersion(binaryRow.schemaVersion())
                         .binaryTuple(binaryRow.tupleSlice())
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .full(full)
@@ -1240,7 +1247,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .requestType(requestType)
                         .schemaVersion(binaryRow.schemaVersion())
                         .primaryKey(binaryRow.tupleSlice())
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .full(full)
@@ -1267,7 +1274,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .requestType(requestType)
                         .schemaVersion(binaryRows.iterator().next().schemaVersion())
                         .binaryTuples(binaryRowsToBuffers(binaryRows))
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .full(full)
@@ -1291,7 +1298,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .requestType(requestType)
                         .schemaVersion(binaryRows.iterator().next().schemaVersion())
                         .primaryKeys(binaryRowsToBuffers(binaryRows))
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .full(full)
@@ -1316,7 +1323,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                             .requestType(RequestType.RW_INSERT)
                             .schemaVersion(binaryRow.schemaVersion())
                             .binaryTuple(binaryRow.tupleSlice())
-                            .enlistmentConsistencyToken(1L)
+                            .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                             .commitPartitionId(commitPartitionId())
                             .coordinatorId(localNode.id())
                             .build();
@@ -1345,7 +1352,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                             .requestType(RequestType.RW_UPSERT_ALL)
                             .schemaVersion(binaryRow0.schemaVersion())
                             .binaryTuples(asList(binaryRow0.tupleSlice(), binaryRow1.tupleSlice()))
-                            .enlistmentConsistencyToken(1L)
+                            .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                             .commitPartitionId(commitPartitionId())
                             .coordinatorId(localNode.id())
                             .build();
@@ -1591,7 +1598,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .txId(txId)
                 .groups(Map.of(grpId, localNode.name()))
                 .commit(false)
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
 
         return partitionReplicaListener.invoke(commitRequest, localNode.id());
@@ -1656,7 +1663,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .groups(Map.of(grpId, localNode.name()))
                 .commit(true)
                 .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
 
         return partitionReplicaListener.invoke(commitRequest, localNode.id());
@@ -1836,7 +1843,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .schemaVersion(oldRow.schemaVersion())
                         .oldBinaryTuple(oldRow.tupleSlice())
                         .newBinaryTuple(newRow.tupleSlice())
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commitPartitionId(commitPartitionId())
                         .coordinatorId(localNode.id())
                         .full(full)
@@ -1854,7 +1861,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                                 .transactionId(targetTxId)
                                 .indexToUse(sortedIndexStorage.id())
                                 .exactKey(toIndexKey(FUTURE_SCHEMA_ROW_INDEXED_VALUE))
-                                .enlistmentConsistencyToken(1L)
+                                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                                 .scanId(1)
                                 .batchSize(100)
                                 .commitPartitionId(commitPartitionId())
@@ -1873,7 +1880,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                                 .groupId(grpId)
                                 .transactionId(targetTxId)
                                 .indexToUse(sortedIndexStorage.id())
-                                .enlistmentConsistencyToken(1L)
+                                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                                 .scanId(1)
                                 .batchSize(100)
                                 .commitPartitionId(commitPartitionId())
@@ -1896,7 +1903,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(grpId)
                         .transactionId(targetTxId)
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .scanId(1)
                         .batchSize(100)
                         .full(false)
@@ -2384,7 +2391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .groupId(commitPartitionId)
                         .groups(groups)
                         .txId(txId)
-                        .enlistmentConsistencyToken(1L)
+                        .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commit(true)
                         .commitTimestampLong(clock.nowLong())
                         .build(),
@@ -2524,15 +2531,20 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row) {
+        return upsertAsync(txId, row, false);
+    }
+
+    private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row, boolean full) {
         ReadWriteSingleRowReplicaRequest message = TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                 .groupId(grpId)
                 .requestType(RequestType.RW_UPSERT)
                 .transactionId(txId)
                 .schemaVersion(row.schemaVersion())
                 .binaryTuple(row.tupleSlice())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .commitPartitionId(commitPartitionId())
                 .coordinatorId(localNode.id())
+                .full(full)
                 .build();
 
         return partitionReplicaListener.invoke(message, localNode.id());
@@ -2545,7 +2557,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .transactionId(txId)
                 .schemaVersion(row.schemaVersion())
                 .primaryKey(row.tupleSlice())
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .commitPartitionId(commitPartitionId())
                 .coordinatorId(localNode.id())
                 .build();
@@ -2599,7 +2611,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .requestType(RequestType.RO_GET_ALL)
                 .schemaVersion(rows.iterator().next().schemaVersion())
                 .primaryKeys(binaryRowsToBuffers(rows))
-                .enlistmentConsistencyToken(1L)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
 
         return partitionReplicaListener.invoke(request, localNode.id());
@@ -2782,7 +2794,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key);
     }
 
-    @ParameterizedTest
+    @ParameterizedTest(name = "readOnly = {0}")
     @ValueSource(booleans = {true, false})
     void testStaleTxOperationAfterIndexStartBuilding(boolean readOnly) {
         fireHashIndexStartBuildingEventForStaleTxOperation();
@@ -2801,6 +2813,61 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         }
     }
 
+    @Test
+    void testBuildIndexReplicaRequestWithoutRwTxOperations() {
+        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
+
+        assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
+
+        fireHashIndexStartBuildingEventForStaleTxOperation();
+
+        assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
+        assertThat(invokeBuildIndexReplicaRequestAsync(1, 1), willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest(name = "failCmd = {0}")
+    @ValueSource(booleans = {false, true})
+    void testBuildIndexReplicaRequest(boolean failCmd) {
+        var continueNotBuildIndexCmdFuture = new CompletableFuture<Void>();
+
+        when(mockRaftClient.run(any())).thenAnswer(invocation -> {
+            Command cmd = invocation.getArgument(0);
+
+            if (cmd instanceof BuildIndexCommand) {
+                return raftClientFutureClosure.apply(cmd);
+            }
+
+            return continueNotBuildIndexCmdFuture.thenCompose(unused -> raftClientFutureClosure.apply(cmd));
+        });
+
+        UUID txId = newTxId();
+        long beginTs = beginTimestamp(txId).longValue();
+
+        when(catalogService.activeCatalogVersion(eq(beginTs))).thenReturn(0);
+
+        BinaryRow row = binaryRow(0);
+
+        CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, true);
+        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
+
+        fireHashIndexStartBuildingEventForStaleTxOperation();
+
+        assertFalse(upsertFuture.isDone());
+        assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
+
+        if (failCmd) {
+            continueNotBuildIndexCmdFuture.completeExceptionally(new RuntimeException("error from test"));
+
+            assertThat(upsertFuture, willThrow(RuntimeException.class));
+        } else {
+            continueNotBuildIndexCmdFuture.complete(null);
+
+            assertThat(upsertFuture, willCompleteSuccessfully());
+        }
+
+        assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
+    }
+
     private void fireHashIndexStartBuildingEventForStaleTxOperation() {
         CatalogHashIndexDescriptor hashIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
 
@@ -2818,4 +2885,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 willCompleteSuccessfully()
         );
     }
+
+    private CompletableFuture<?> invokeBuildIndexReplicaRequestAsync(int indexId, int indexCreationCatalogVersion) {
+        BuildIndexReplicaRequest request = TABLE_MESSAGES_FACTORY.buildIndexReplicaRequest()
+                .groupId(grpId)
+                .indexId(indexId)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+                .creationCatalogVersion(indexCreationCatalogVersion)
+                .rowIds(List.of())
+                .build();
+
+        return partitionReplicaListener.invoke(request, localNode.id());
+    }
 }