You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/06/08 15:22:03 UTC
[ignite-3] branch main updated: IGNITE-19136 Handling timeout on waiting for replica readiness (#2144)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3f071eb833 IGNITE-19136 Handling timeout on waiting for replica readiness (#2144)
3f071eb833 is described below
commit 3f071eb8332864f2b72a2d38a3f16b6cb69a9ed3
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu Jun 8 19:21:57 2023 +0400
IGNITE-19136 Handling timeout on waiting for replica readiness (#2144)
---
.../ignite/internal/replicator/ReplicaService.java | 28 ++++++----
.../ignite/distributed/ReplicaUnavailableTest.java | 59 ++++++++++++++++++++--
2 files changed, 74 insertions(+), 13 deletions(-)
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index d9d1bc1fe8..2360a20a8b 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.replicator;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -89,9 +91,7 @@ public class ReplicaService {
// TODO: IGNITE-17824 Use named executor instead of default one in order to process replica Response.
messagingService.invoke(targetNodeConsistentId, req, RPC_TIMEOUT).whenCompleteAsync((response, throwable) -> {
if (throwable != null) {
- if (throwable instanceof CompletionException) {
- throwable = throwable.getCause();
- }
+ throwable = unwrapCause(throwable);
if (throwable instanceof TimeoutException) {
res.completeExceptionally(new ReplicationTimeoutException(req.groupId()));
@@ -128,17 +128,27 @@ public class ReplicaService {
pendingInvokes.remove(targetNodeConsistentId, awaitReplicaFut);
if (throwable0 != null) {
- if (throwable0 instanceof CompletionException) {
- throwable0 = throwable0.getCause();
- }
+ throwable0 = unwrapCause(throwable0);
if (throwable0 instanceof TimeoutException) {
- res.completeExceptionally(errResp.throwable());
+ res.completeExceptionally(withCause(
+ ReplicationTimeoutException::new,
+ REPLICA_TIMEOUT_ERR,
+ format(
+ "Could not wait for the replica readiness due to timeout [replicaGroupId={}, req={}]",
+ req.groupId(),
+ req.getClass().getSimpleName()
+ ),
+ throwable0));
} else {
res.completeExceptionally(withCause(
ReplicationException::new,
REPLICA_COMMON_ERR,
- "Failed to process replica request [replicaGroupId=" + req.groupId() + ']',
+ format(
+ "Failed to process replica request [replicaGroupId={}, req={}]",
+ req.groupId(),
+ req.getClass().getSimpleName()
+ ),
throwable0));
}
} else {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index a6cfc6313b..ccda9d0124 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -22,6 +22,8 @@ import static org.apache.ignite.distributed.ItTxDistributedTestSingleNode.NODE_P
import static org.apache.ignite.distributed.ItTxDistributedTestSingleNode.startNode;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,7 +45,8 @@ import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
-import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
@@ -184,7 +187,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRow(createKeyValueRow(1L, 1L))
+ .binaryRowBytes(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -236,10 +239,58 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
}
assertTrue(e0 != null);
- assertTrue(e0.getCause() instanceof ReplicaUnavailableException, e0.toString());
+ assertTrue(unwrapCause(e0) instanceof ReplicationException, e0.toString());
assertTrue(e1 != null);
- assertTrue(e1.getCause() instanceof ReplicaUnavailableException, e1.toString());
+ assertTrue(unwrapCause(e1) instanceof ReplicationException, e1.toString());
+ }
+
+ @Test
+ public void testWithNotReadyReplica() {
+ ClusterNode clusterNode = clusterService.topologyService().localMember();
+
+ TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+
+ clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
+ (message, sender, correlationId) -> {
+ try {
+ log.info("Replica msg " + message.getClass().getSimpleName());
+
+ replicaManager.startReplica(
+ tablePartitionId,
+ new CompletableFuture<>(),
+ request0 -> completedFuture(replicaMessageFactory.replicaResponse()
+ .result(Integer.valueOf(5))
+ .build()),
+ mock(TopologyAwareRaftGroupService.class),
+ new PendingComparableValuesTracker<>(0L)
+ );
+ } catch (NodeStoppingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ ReadWriteSingleRowReplicaRequest request = tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ .groupId(tablePartitionId)
+ .transactionId(TestTransactionIds.newTransactionId())
+ .commitPartitionId(tablePartitionId)
+ .timestampLong(clock.nowLong())
+ .binaryRowBytes(createKeyValueRow(1L, 1L))
+ .requestType(RequestType.RW_GET)
+ .build();
+
+ Exception e0 = null;
+
+ try {
+ replicaService.invoke(clusterNode, request).get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e0 = e;
+ }
+
+ assertTrue(e0 != null);
+ assertTrue(unwrapCause(e0) instanceof ReplicationTimeoutException, e0.toString());
+ assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException) unwrapCause(e0)).code());
}
private static ByteBuffer createKeyValueRow(long id, long value) {