You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/06/08 15:22:03 UTC

[ignite-3] branch main updated: IGNITE-19136 Handling timeout on waiting for replica readiness (#2144)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f071eb833 IGNITE-19136 Handling timeout on waiting for replica readiness (#2144)
3f071eb833 is described below

commit 3f071eb8332864f2b72a2d38a3f16b6cb69a9ed3
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu Jun 8 19:21:57 2023 +0400

    IGNITE-19136 Handling timeout on waiting for replica readiness (#2144)
---
 .../ignite/internal/replicator/ReplicaService.java | 28 ++++++----
 .../ignite/distributed/ReplicaUnavailableTest.java | 59 ++++++++++++++++++++--
 2 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index d9d1bc1fe8..2360a20a8b 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.replicator;
 
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -89,9 +91,7 @@ public class ReplicaService {
         // TODO: IGNITE-17824 Use named executor instead of default one in order to process replica Response.
         messagingService.invoke(targetNodeConsistentId, req, RPC_TIMEOUT).whenCompleteAsync((response, throwable) -> {
             if (throwable != null) {
-                if (throwable instanceof CompletionException) {
-                    throwable = throwable.getCause();
-                }
+                throwable = unwrapCause(throwable);
 
                 if (throwable instanceof TimeoutException) {
                     res.completeExceptionally(new ReplicationTimeoutException(req.groupId()));
@@ -128,17 +128,27 @@ public class ReplicaService {
                             pendingInvokes.remove(targetNodeConsistentId, awaitReplicaFut);
 
                             if (throwable0 != null) {
-                                if (throwable0 instanceof CompletionException) {
-                                    throwable0 = throwable0.getCause();
-                                }
+                                throwable0 = unwrapCause(throwable0);
 
                                 if (throwable0 instanceof TimeoutException) {
-                                    res.completeExceptionally(errResp.throwable());
+                                    res.completeExceptionally(withCause(
+                                            ReplicationTimeoutException::new,
+                                            REPLICA_TIMEOUT_ERR,
+                                            format(
+                                                    "Could not wait for the replica readiness due to timeout [replicaGroupId={}, req={}]",
+                                                    req.groupId(),
+                                                    req.getClass().getSimpleName()
+                                            ),
+                                            throwable0));
                                 } else {
                                     res.completeExceptionally(withCause(
                                             ReplicationException::new,
                                             REPLICA_COMMON_ERR,
-                                            "Failed to process replica request [replicaGroupId=" + req.groupId() + ']',
+                                            format(
+                                                    "Failed to process replica request [replicaGroupId={}, req={}]",
+                                                    req.groupId(),
+                                                    req.getClass().getSimpleName()
+                                            ),
                                             throwable0));
                                 }
                             } else {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index a6cfc6313b..ccda9d0124 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -22,6 +22,8 @@ import static org.apache.ignite.distributed.ItTxDistributedTestSingleNode.NODE_P
 import static org.apache.ignite.distributed.ItTxDistributedTestSingleNode.startNode;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
 import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,7 +45,8 @@ import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
-import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
@@ -184,7 +187,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId)
                 .timestampLong(clock.nowLong())
-                .binaryRow(createKeyValueRow(1L, 1L))
+                .binaryRowBytes(createKeyValueRow(1L, 1L))
                 .requestType(RequestType.RW_GET)
                 .build();
 
@@ -236,10 +239,58 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
         }
 
         assertTrue(e0 != null);
-        assertTrue(e0.getCause() instanceof ReplicaUnavailableException, e0.toString());
+        assertTrue(unwrapCause(e0) instanceof ReplicationException, e0.toString());
 
         assertTrue(e1 != null);
-        assertTrue(e1.getCause() instanceof ReplicaUnavailableException, e1.toString());
+        assertTrue(unwrapCause(e1) instanceof ReplicationException, e1.toString());
+    }
+
+    @Test
+    public void testWithNotReadyReplica() {
+        ClusterNode clusterNode = clusterService.topologyService().localMember();
+
+        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+
+        clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
+                (message, sender, correlationId) -> {
+                    try {
+                        log.info("Replica msg " + message.getClass().getSimpleName());
+
+                        replicaManager.startReplica(
+                                tablePartitionId,
+                                new CompletableFuture<>(),
+                                request0 -> completedFuture(replicaMessageFactory.replicaResponse()
+                                        .result(Integer.valueOf(5))
+                                        .build()),
+                                mock(TopologyAwareRaftGroupService.class),
+                                new PendingComparableValuesTracker<>(0L)
+                        );
+                    } catch (NodeStoppingException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+        );
+
+        ReadWriteSingleRowReplicaRequest request = tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                .groupId(tablePartitionId)
+                .transactionId(TestTransactionIds.newTransactionId())
+                .commitPartitionId(tablePartitionId)
+                .timestampLong(clock.nowLong())
+                .binaryRowBytes(createKeyValueRow(1L, 1L))
+                .requestType(RequestType.RW_GET)
+                .build();
+
+        Exception e0 = null;
+
+        try {
+            replicaService.invoke(clusterNode, request).get(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            e0 = e;
+        }
+
+        assertTrue(e0 != null);
+        assertTrue(unwrapCause(e0) instanceof ReplicationTimeoutException, e0.toString());
+        assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException) unwrapCause(e0)).code());
     }
 
     private static ByteBuffer createKeyValueRow(long id, long value) {