You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2024/02/01 20:36:27 UTC

(ignite-3) branch main updated: IGNITE-21394 TimeoutException in the listener of pending assignments change shouldn't fail the watch processor (#3139)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d1cb712f06 IGNITE-21394 TimeoutException in the listener of pending assignments change shouldn't fail the watch processor (#3139)
d1cb712f06 is described below

commit d1cb712f0667f80d9896ca15f39939ba00afdaca
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu Feb 1 23:36:21 2024 +0300

    IGNITE-21394 TimeoutException in the listener of pending assignments change shouldn't fail the watch processor (#3139)
---
 .../internal/raft/service/LeaderWithTerm.java      | 14 ++++++++++++-
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  4 ++++
 .../raft/client/TopologyAwareRaftGroupService.java |  4 ++--
 .../internal/table/distributed/TableManager.java   | 24 ++++++++++++++++++----
 .../PartitionReplicaListenerIndexLockingTest.java  |  2 +-
 5 files changed, 40 insertions(+), 8 deletions(-)

diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
index 16133ebe2d..d22ae5e091 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
@@ -24,16 +24,28 @@ import org.jetbrains.annotations.Nullable;
  * Class representing a Raft group leader and its term.
  */
 public class LeaderWithTerm {
+    /** The instance determines a state where the leader is undefined. */
+    public static LeaderWithTerm NO_LEADER = new LeaderWithTerm(null, -1);
+
     @Nullable
     private final Peer leader;
 
     private final long term;
 
-    public LeaderWithTerm(@Nullable Peer leader, long term) {
+    public LeaderWithTerm(Peer leader, long term) {
         this.leader = leader;
         this.term = term;
     }
 
+    /**
+     * Checks if there is any useful information.
+     *
+     * @return True if the instance does not contain useful data, false otherwise.
+     */
+    public boolean isEmpty() {
+        return leader == null;
+    }
+
     public @Nullable Peer leader() {
         return leader;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 1441ca84d7..2fc35559b7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -244,6 +244,10 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         return this.<GetLeaderResponse>sendWithRetry(randomNode(), requestFactory)
                 .thenApply(resp -> {
+                    if (resp.leaderId() == null) {
+                        return LeaderWithTerm.NO_LEADER;
+                    }
+
                     Peer respLeader = parsePeer(resp.leaderId());
 
                     this.leader = respLeader;
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 8225dbbcf7..d9ba43cd5f 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -140,7 +140,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
                                     if (subscribed) {
                                         return refreshAndGetLeaderWithTerm()
                                                 .thenAcceptAsync(leaderWithTerm -> {
-                                                    if (leaderWithTerm.leader() != null
+                                                    if (!leaderWithTerm.isEmpty()
                                                             && appearedNode.name().equals(leaderWithTerm.leader().consistentId())) {
                                                         serverEventHandler.onLeaderElected(appearedNode, leaderWithTerm.term());
                                                     }
@@ -310,7 +310,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
                 }
 
                 refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
-                    if (leaderWithTerm.leader() != null) {
+                    if (!leaderWithTerm.isEmpty()) {
                         serverEventHandler.onLeaderElected(
                                 clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()),
                                 leaderWithTerm.term()
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0e1bbc4f21..fde9dd7aeb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -26,6 +26,7 @@ import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
@@ -45,6 +46,7 @@ import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFu
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.nio.file.Path;
@@ -71,7 +73,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -121,6 +123,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
@@ -1509,7 +1512,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
     @Override
     public CompletableFuture<Table> tableAsync(String name) {
         return tableAsyncInternal(IgniteNameUtils.parseSimpleName(name))
-                .thenApply(Function.identity());
+                .thenApply(identity());
     }
 
     /**
@@ -1766,7 +1769,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                         busyLock.leaveBusy();
                     }
                 })
-                .thenCompose(Function.identity());
+                .thenCompose(identity());
     }
 
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(
@@ -1831,8 +1834,21 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         RaftGroupService partGrpSvc = table.internalTable().partitionRaftGroupService(partId);
 
         return partGrpSvc.refreshAndGetLeaderWithTerm()
+                .exceptionally(throwable -> {
+                    if (throwable instanceof TimeoutException) {
+                        LOG.info("Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", replicaGrpId);
+
+                        return LeaderWithTerm.NO_LEADER;
+                    }
+
+                    throw new IgniteInternalException(
+                            INTERNAL_ERR,
+                            "Failed to get a leader for the RAFT replication group [get=" + replicaGrpId + "].",
+                            throwable
+                    );
+                })
                 .thenCompose(leaderWithTerm -> {
-                    if (!isLocalPeer(leaderWithTerm.leader())) {
+                    if (leaderWithTerm.isEmpty() || !isLocalPeer(leaderWithTerm.leader())) {
                         return nullCompletedFuture();
                     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 6401d2977a..7e1c403ff3 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -145,7 +145,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
         RaftGroupService mockRaftClient = mock(RaftGroupService.class);
 
         when(mockRaftClient.refreshAndGetLeaderWithTerm())
-                .thenAnswer(invocationOnMock -> completedFuture(new LeaderWithTerm(null, 1L)));
+                .thenAnswer(invocationOnMock -> completedFuture(LeaderWithTerm.NO_LEADER));
         when(mockRaftClient.run(any()))
                 .thenAnswer(invocationOnMock -> nullCompletedFuture());