You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2024/02/06 14:14:32 UTC
(ignite-3) branch main updated: IGNITE-21111 Add mechanism to wait for completion of in-flight operations of RW transactions started before index appearance (#3160)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 164eb449dc IGNITE-21111 Add mechanism to wait for completion of in-flight operations of RW transactions started before index appearance (#3160)
164eb449dc is described below
commit 164eb449dcb367afe5f8dd3f227862f77afcc567
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Feb 6 17:14:26 2024 +0300
IGNITE-21111 Add mechanism to wait for completion of in-flight operations of RW transactions started before index appearance (#3160)
---
.../internal/index/IndexBuildController.java | 3 +-
.../ignite/internal/index/IndexBuildTask.java | 25 +++-
.../apache/ignite/internal/index/IndexBuilder.java | 7 +-
.../index/IndexAvailabilityControllerTest.java | 18 ++-
.../internal/index/IndexBuildControllerTest.java | 23 +++-
.../ignite/internal/index/IndexBuilderTest.java | 26 +++-
.../ignite/internal/replicator/ReplicaService.java | 8 +-
.../request/BuildIndexReplicaRequest.java | 3 +
.../replicator/PartitionReplicaListener.java | 7 +-
.../replication/PartitionReplicaListenerTest.java | 143 ++++++++++++++++-----
10 files changed, 206 insertions(+), 57 deletions(-)
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index cb1e9c461e..436bfa67ea 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -311,7 +311,8 @@ class IndexBuildController implements ManuallyCloseable {
indexStorage,
mvPartition,
localNode(),
- enlistmentConsistencyToken
+ enlistmentConsistencyToken,
+ indexDescriptor.creationCatalogVersion()
);
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 135af9378e..bd4c523b69 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -35,11 +35,13 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
@@ -69,6 +71,8 @@ class IndexBuildTask {
private final long enlistmentConsistencyToken;
+ private final int creationCatalogVersion;
+
private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
private final AtomicBoolean taskStopGuard = new AtomicBoolean();
@@ -85,7 +89,8 @@ class IndexBuildTask {
int batchSize,
ClusterNode node,
List<IndexBuildCompletionListener> listeners,
- long enlistmentConsistencyToken
+ long enlistmentConsistencyToken,
+ int creationCatalogVersion
) {
this.taskId = taskId;
this.indexStorage = indexStorage;
@@ -98,6 +103,7 @@ class IndexBuildTask {
// We do not intentionally make a copy of the list, we want to see changes in the passed list.
this.listeners = listeners;
this.enlistmentConsistencyToken = enlistmentConsistencyToken;
+ this.creationCatalogVersion = creationCatalogVersion;
}
/** Starts building the index. */
@@ -158,8 +164,15 @@ class IndexBuildTask {
List<RowId> batchRowIds = createBatchRowIds();
return replicaService.invoke(node, createBuildIndexReplicaRequest(batchRowIds))
- .thenComposeAsync(unused -> {
- if (indexStorage.getNextRowIdToBuild() == null) {
+ .handleAsync((unused, throwable) -> {
+ if (throwable != null) {
+ Throwable cause = unwrapCause(throwable);
+
+ // Read-write transaction operations have not yet completed, let's try to send the batch again.
+ if (!(cause instanceof ReplicationTimeoutException)) {
+ return CompletableFuture.<Void>failedFuture(cause);
+ }
+ } else if (indexStorage.getNextRowIdToBuild() == null) {
// Index has been built.
LOG.info("Index build completed: [{}]", createCommonIndexInfo());
@@ -167,11 +180,12 @@ class IndexBuildTask {
listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
}
- return nullCompletedFuture();
+ return CompletableFutures.<Void>nullCompletedFuture();
}
return handleNextBatch();
- }, executor);
+ }, executor)
+ .thenCompose(Function.identity());
} catch (Throwable t) {
return failedFuture(t);
} finally {
@@ -208,6 +222,7 @@ class IndexBuildTask {
.rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
.finish(finish)
.enlistmentConsistencyToken(enlistmentConsistencyToken)
+ .creationCatalogVersion(creationCatalogVersion)
.build();
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index c5b1f61dfe..daa3f51375 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -96,6 +96,7 @@ class IndexBuilder implements ManuallyCloseable {
* @param node Node to which requests to build the index will be sent.
* @param enlistmentConsistencyToken Enlistment consistency token is used to check that the lease is still actual while the message goes
* to the replica.
+ * @param creationCatalogVersion Catalog version in which the index was created.
*/
// TODO: IGNITE-19498 Perhaps we need to start building the index only once
public void scheduleBuildIndex(
@@ -105,7 +106,8 @@ class IndexBuilder implements ManuallyCloseable {
IndexStorage indexStorage,
MvPartitionStorage partitionStorage,
ClusterNode node,
- long enlistmentConsistencyToken
+ long enlistmentConsistencyToken,
+ int creationCatalogVersion
) {
inBusyLockSafe(busyLock, () -> {
if (indexStorage.getNextRowIdToBuild() == null) {
@@ -128,7 +130,8 @@ class IndexBuilder implements ManuallyCloseable {
BATCH_SIZE,
node,
listeners,
- enlistmentConsistencyToken
+ enlistmentConsistencyToken,
+ creationCatalogVersion
);
IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask);
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index c316626f7f..2e36085313 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -29,6 +29,7 @@ import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NA
import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
@@ -191,7 +192,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
startBuildIndex(indexId);
- finishBuildingIndexForPartition(indexId, 0);
+ finishBuildingIndexForPartition(indexId, 0, indexCreationCatalogVersion(INDEX_NAME));
awaitTillGlobalMetastoreRevisionIsApplied();
@@ -262,7 +263,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
int indexId = indexId(INDEX_NAME);
- finishBuildingIndexForPartition(indexId, 0);
+ finishBuildingIndexForPartition(indexId, 0, indexCreationCatalogVersion(INDEX_NAME));
dropIndex(INDEX_NAME);
@@ -282,7 +283,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
int indexId = indexId(INDEX_NAME);
for (int partitionId = 1; partitionId < partitions; partitionId++) {
- finishBuildingIndexForPartition(indexId, partitionId);
+ finishBuildingIndexForPartition(indexId, partitionId, indexCreationCatalogVersion(INDEX_NAME));
}
dropIndex(INDEX_NAME);
@@ -303,7 +304,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
int indexId = indexId(INDEX_NAME);
for (int partitionId = 2; partitionId < partitions; partitionId++) {
- finishBuildingIndexForPartition(indexId, partitionId);
+ finishBuildingIndexForPartition(indexId, partitionId, indexCreationCatalogVersion(INDEX_NAME));
}
dropIndex(INDEX_NAME);
@@ -354,7 +355,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
partitions = newPartitions;
}
- private void finishBuildingIndexForPartition(int indexId, int partitionId) {
+ private void finishBuildingIndexForPartition(int indexId, int partitionId, int indexCreationCatalogVersion) {
// It may look complicated, but the other method through mocking IndexBuilder seems messier.
IndexStorage indexStorage = mock(IndexStorage.class);
@@ -369,7 +370,8 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
indexStorage,
mock(MvPartitionStorage.class),
mock(ClusterNode.class),
- ANY_ENLISTMENT_CONSISTENCY_TOKEN
+ ANY_ENLISTMENT_CONSISTENCY_TOKEN,
+ indexCreationCatalogVersion
);
CompletableFuture<Void> finishBuildIndexFuture = new CompletableFuture<>();
@@ -430,4 +432,8 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
private static String partitionBuildIndexKey(int indexId, int partitionId) {
return "indexBuild.partition." + indexId + "." + partitionId;
}
+
+ private int indexCreationCatalogVersion(String indexName) {
+ return getIndexStrict(catalogManager, indexName, clock.nowLong()).creationCatalogVersion();
+ }
}
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index f7167a36c1..4d7e197b47 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -30,6 +30,7 @@ import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NA
import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -128,7 +129,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
any(),
any(),
eq(LOCAL_NODE),
- anyLong()
+ anyLong(),
+ eq(indexCreationCatalogVersion(INDEX_NAME))
);
}
@@ -149,7 +151,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
any(),
any(),
eq(LOCAL_NODE),
- anyLong()
+ anyLong(),
+ eq(indexCreationCatalogVersion(INDEX_NAME))
);
}
@@ -168,7 +171,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
any(),
any(),
eq(LOCAL_NODE),
- anyLong()
+ anyLong(),
+ eq(indexCreationCatalogVersion(INDEX_NAME))
);
}
@@ -183,7 +187,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
any(),
any(),
eq(LOCAL_NODE),
- anyLong()
+ anyLong(),
+ eq(indexCreationCatalogVersion(pkIndexName(TABLE_NAME)))
);
}
@@ -202,7 +207,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
any(),
any(),
eq(LOCAL_NODE),
- anyLong()
+ anyLong(),
+ eq(indexCreationCatalogVersion(pkIndexName(tableName)))
);
}
@@ -244,7 +250,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
any(),
any(),
eq(LOCAL_NODE),
- anyLong()
+ anyLong(),
+ anyInt()
);
}
@@ -296,4 +303,8 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
new TablePartitionId(tableId(), PARTITION_ID)
);
}
+
+ private int indexCreationCatalogVersion(String indexName) {
+ return getIndexStrict(catalogManager, indexName, clock.nowLong()).creationCatalogVersion();
+ }
}
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index eaae2356d0..a499eeec96 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.index;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
@@ -28,6 +29,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collection;
@@ -38,10 +41,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.AfterEach;
@@ -57,6 +63,8 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest {
private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 100500;
+ private static final int ANY_INDEX_CREATION_CATALOG_VERSION = 1;
+
private final ReplicaService replicaService = mock(ReplicaService.class, invocation -> nullCompletedFuture());
private final ExecutorService executorService = newSingleThreadExecutor();
@@ -126,6 +134,21 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest {
assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully());
}
+ @Test
+ void testIndexBuildWithReplicationTimeoutException() {
+ CompletableFuture<Void> listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID);
+
+ when(replicaService.invoke(any(ClusterNode.class), any()))
+ .thenReturn(failedFuture(new ReplicationTimeoutException(new TablePartitionId(TABLE_ID, PARTITION_ID))))
+ .thenReturn(nullCompletedFuture());
+
+ scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID)));
+
+ assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully());
+
+ verify(replicaService, times(2)).invoke(any(ClusterNode.class), any(BuildIndexReplicaRequest.class));
+ }
+
private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
indexBuilder.scheduleBuildIndex(
tableId,
@@ -134,7 +157,8 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest {
indexStorage(nextRowIdsToBuild),
mock(MvPartitionStorage.class),
mock(ClusterNode.class),
- ANY_ENLISTMENT_CONSISTENCY_TOKEN
+ ANY_ENLISTMENT_CONSISTENCY_TOKEN,
+ ANY_INDEX_CREATION_CATALOG_VERSION
);
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 731ff50b4a..1a36116972 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -43,9 +43,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.network.ClusterNode;
-/**
- * The service is intended to execute requests on replicas.
- */
+/** The service is intended to execute requests on replicas. */
public class ReplicaService {
/** Network timeout. */
private static final long RPC_TIMEOUT = 3000;
@@ -84,6 +82,7 @@ public class ReplicaService {
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+ * @see ReplicationTimeoutException If the response could not be received due to a timeout.
*/
private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, ReplicaRequest req) {
CompletableFuture<R> res = new CompletableFuture<>();
@@ -190,6 +189,7 @@ public class ReplicaService {
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+ * @see ReplicationTimeoutException If the response could not be received due to a timeout.
*/
public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest request) {
return sendToReplica(node.name(), request);
@@ -203,6 +203,7 @@ public class ReplicaService {
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+ * @see ReplicationTimeoutException If the response could not be received due to a timeout.
*/
public <R> CompletableFuture<R> invoke(String replicaConsistentId, ReplicaRequest request) {
return sendToReplica(replicaConsistentId, request);
@@ -217,6 +218,7 @@ public class ReplicaService {
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
+ * @see ReplicationTimeoutException If the response could not be received due to a timeout.
*/
public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest request, String storageId) {
return sendToReplica(node.name(), request);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
index e737d00b2d..76a38a0e29 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
@@ -39,4 +39,7 @@ public interface BuildIndexReplicaRequest extends PrimaryReplicaRequest {
/** Returns {@code true} if this batch is the last one. */
boolean finish();
+
+ /** Return catalog version in which the index was created. */
+ int creationCatalogVersion();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ed0d733ee8..f91b8aeb0a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -645,7 +645,7 @@ public class PartitionReplicaListener implements ReplicaListener {
} else if (request instanceof ReplicaSafeTimeSyncRequest) {
return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, isPrimary);
} else if (request instanceof BuildIndexReplicaRequest) {
- return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
+ return processBuildIndexReplicaRequest((BuildIndexReplicaRequest) request);
} else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTsIfDirectRo);
} else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
@@ -3796,4 +3796,9 @@ public class PartitionReplicaListener implements ReplicaListener {
busyLock.leaveBusy();
}
}
+
+ private CompletableFuture<?> processBuildIndexReplicaRequest(BuildIndexReplicaRequest request) {
+ return txRwOperationTracker.awaitCompleteTxRwOperations(request.creationCatalogVersion())
+ .thenCompose(unused -> raftClient.run(toBuildIndexCommand(request)));
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index cc60c1e039..b2150df118 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -139,9 +139,9 @@ import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
-import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl;
@@ -150,6 +150,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
@@ -242,6 +243,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static final int ANOTHER_TABLE_ID = 2;
+ private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 1L;
+
private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
/** The storage stores partition data. */
@@ -249,9 +252,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private final LockManager lockManager = new HeapLockManager();
- private final Function<PartitionCommand, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> {
+ private final Function<Command, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> {
if (cmd instanceof WriteIntentSwitchCommand) {
- Set<RowId> rows = pendingRows.remove(cmd.txId());
+ UUID txId = ((WriteIntentSwitchCommand) cmd).txId();
+
+ Set<RowId> rows = pendingRows.remove(txId);
HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand) cmd).commitTimestamp();
assertNotNull(commitTimestamp);
@@ -262,9 +267,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
}
- lockManager.releaseAll(cmd.txId());
+ lockManager.releaseAll(txId);
} else if (cmd instanceof UpdateCommand) {
- pendingRows.compute(cmd.txId(), (txId, v) -> {
+ UUID txId = ((UpdateCommand) cmd).txId();
+
+ pendingRows.compute(txId, (txId0, v) -> {
if (v == null) {
v = new HashSet<>();
}
@@ -384,7 +391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Secondary hash index. */
private TableSchemaAwareIndexStorage hashIndexStorage;
- private Function<PartitionCommand, CompletableFuture<?>> raftClientFutureClosure = defaultMockRaftFutureClosure;
+ private Function<Command, CompletableFuture<?>> raftClientFutureClosure = defaultMockRaftFutureClosure;
private static final AtomicInteger nextMonotonicInt = new AtomicInteger(1);
@@ -616,7 +623,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(newTxId())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build(), "senderId");
TransactionMeta txMeta = (TransactionMeta) fut.get(1, TimeUnit.SECONDS).result();
@@ -637,7 +644,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(txId)
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build(), localNode.id());
TransactionMeta txMeta = (TransactionMeta) fut.get(1, TimeUnit.SECONDS).result();
@@ -672,7 +679,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
.txId(newTxId())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build(), localNode.id());
assertThrowsWithCause(
@@ -713,7 +720,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.schemaVersion(pk.schemaVersion())
.primaryKey(pk.tupleSlice())
.requestType(RequestType.RO_GET)
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(request, localNode.id());
@@ -820,7 +827,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(scanTxId)
.timestampLong(clock.nowLong())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
@@ -838,7 +845,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(scanTxId)
.timestampLong(clock.nowLong())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
@@ -856,7 +863,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(newTxId())
.timestampLong(clock.nowLong())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(2L)
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(1))
@@ -877,7 +884,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(newTxId())
.timestampLong(clock.nowLong())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(2L)
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(5))
@@ -896,7 +903,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(newTxId())
.timestampLong(clock.nowLong())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(2L)
.indexToUse(sortedIndexId)
.exactKey(toIndexKey(0))
@@ -1220,7 +1227,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(requestType)
.schemaVersion(binaryRow.schemaVersion())
.binaryTuple(binaryRow.tupleSlice())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
@@ -1240,7 +1247,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(requestType)
.schemaVersion(binaryRow.schemaVersion())
.primaryKey(binaryRow.tupleSlice())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
@@ -1267,7 +1274,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(requestType)
.schemaVersion(binaryRows.iterator().next().schemaVersion())
.binaryTuples(binaryRowsToBuffers(binaryRows))
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
@@ -1291,7 +1298,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(requestType)
.schemaVersion(binaryRows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(binaryRows))
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
@@ -1316,7 +1323,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(RequestType.RW_INSERT)
.schemaVersion(binaryRow.schemaVersion())
.binaryTuple(binaryRow.tupleSlice())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build();
@@ -1345,7 +1352,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(RequestType.RW_UPSERT_ALL)
.schemaVersion(binaryRow0.schemaVersion())
.binaryTuples(asList(binaryRow0.tupleSlice(), binaryRow1.tupleSlice()))
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build();
@@ -1591,7 +1598,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.txId(txId)
.groups(Map.of(grpId, localNode.name()))
.commit(false)
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(commitRequest, localNode.id());
@@ -1656,7 +1663,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groups(Map.of(grpId, localNode.name()))
.commit(true)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(commitRequest, localNode.id());
@@ -1836,7 +1843,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.schemaVersion(oldRow.schemaVersion())
.oldBinaryTuple(oldRow.tupleSlice())
.newBinaryTuple(newRow.tupleSlice())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.full(full)
@@ -1854,7 +1861,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.transactionId(targetTxId)
.indexToUse(sortedIndexStorage.id())
.exactKey(toIndexKey(FUTURE_SCHEMA_ROW_INDEXED_VALUE))
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
.batchSize(100)
.commitPartitionId(commitPartitionId())
@@ -1873,7 +1880,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(targetTxId)
.indexToUse(sortedIndexStorage.id())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
.batchSize(100)
.commitPartitionId(commitPartitionId())
@@ -1896,7 +1903,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
.batchSize(100)
.full(false)
@@ -2384,7 +2391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(commitPartitionId)
.groups(groups)
.txId(txId)
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commit(true)
.commitTimestampLong(clock.nowLong())
.build(),
@@ -2524,15 +2531,20 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row) {
+ return upsertAsync(txId, row, false);
+ }
+
+ private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row, boolean full) {
ReadWriteSingleRowReplicaRequest message = TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(grpId)
.requestType(RequestType.RW_UPSERT)
.transactionId(txId)
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
+ .full(full)
.build();
return partitionReplicaListener.invoke(message, localNode.id());
@@ -2545,7 +2557,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.transactionId(txId)
.schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commitPartitionId(commitPartitionId())
.coordinatorId(localNode.id())
.build();
@@ -2599,7 +2611,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.requestType(RequestType.RO_GET_ALL)
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(rows))
- .enlistmentConsistencyToken(1L)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
return partitionReplicaListener.invoke(request, localNode.id());
@@ -2782,7 +2794,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key);
}
- @ParameterizedTest
+ @ParameterizedTest(name = "readOnly = {0}")
@ValueSource(booleans = {true, false})
void testStaleTxOperationAfterIndexStartBuilding(boolean readOnly) {
fireHashIndexStartBuildingEventForStaleTxOperation();
@@ -2801,6 +2813,61 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
}
+ @Test
+ void testBuildIndexReplicaRequestWithoutRwTxOperations() {
+ CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
+
+ assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
+
+ fireHashIndexStartBuildingEventForStaleTxOperation();
+
+ assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
+ assertThat(invokeBuildIndexReplicaRequestAsync(1, 1), willCompleteSuccessfully());
+ }
+
+ @ParameterizedTest(name = "failCmd = {0}")
+ @ValueSource(booleans = {false, true})
+ void testBuildIndexReplicaRequest(boolean failCmd) {
+ var continueNotBuildIndexCmdFuture = new CompletableFuture<Void>();
+
+ when(mockRaftClient.run(any())).thenAnswer(invocation -> {
+ Command cmd = invocation.getArgument(0);
+
+ if (cmd instanceof BuildIndexCommand) {
+ return raftClientFutureClosure.apply(cmd);
+ }
+
+ return continueNotBuildIndexCmdFuture.thenCompose(unused -> raftClientFutureClosure.apply(cmd));
+ });
+
+ UUID txId = newTxId();
+ long beginTs = beginTimestamp(txId).longValue();
+
+ when(catalogService.activeCatalogVersion(eq(beginTs))).thenReturn(0);
+
+ BinaryRow row = binaryRow(0);
+
+ CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, true);
+ CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1);
+
+ fireHashIndexStartBuildingEventForStaleTxOperation();
+
+ assertFalse(upsertFuture.isDone());
+ assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
+
+ if (failCmd) {
+ continueNotBuildIndexCmdFuture.completeExceptionally(new RuntimeException("error from test"));
+
+ assertThat(upsertFuture, willThrow(RuntimeException.class));
+ } else {
+ continueNotBuildIndexCmdFuture.complete(null);
+
+ assertThat(upsertFuture, willCompleteSuccessfully());
+ }
+
+ assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully());
+ }
+
private void fireHashIndexStartBuildingEventForStaleTxOperation() {
CatalogHashIndexDescriptor hashIndexDescriptor = mock(CatalogHashIndexDescriptor.class);
@@ -2818,4 +2885,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
willCompleteSuccessfully()
);
}
+
+ private CompletableFuture<?> invokeBuildIndexReplicaRequestAsync(int indexId, int indexCreationCatalogVersion) {
+ BuildIndexReplicaRequest request = TABLE_MESSAGES_FACTORY.buildIndexReplicaRequest()
+ .groupId(grpId)
+ .indexId(indexId)
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+ .creationCatalogVersion(indexCreationCatalogVersion)
+ .rowIds(List.of())
+ .build();
+
+ return partitionReplicaListener.invoke(request, localNode.id());
+ }
}