You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2024/02/01 20:36:27 UTC
(ignite-3) branch main updated: IGNITE-21394 TimeoutException in the listener of pending assignments change shouldn't fail the watch processor (#3139)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d1cb712f06 IGNITE-21394 TimeoutException in the listener of pending assignments change shouldn't fail the watch processor (#3139)
d1cb712f06 is described below
commit d1cb712f0667f80d9896ca15f39939ba00afdaca
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu Feb 1 23:36:21 2024 +0300
IGNITE-21394 TimeoutException in the listener of pending assignments change shouldn't fail the watch processor (#3139)
---
.../internal/raft/service/LeaderWithTerm.java | 14 ++++++++++++-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 4 ++++
.../raft/client/TopologyAwareRaftGroupService.java | 4 ++--
.../internal/table/distributed/TableManager.java | 24 ++++++++++++++++++----
.../PartitionReplicaListenerIndexLockingTest.java | 2 +-
5 files changed, 40 insertions(+), 8 deletions(-)
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
index 16133ebe2d..d22ae5e091 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
@@ -24,16 +24,28 @@ import org.jetbrains.annotations.Nullable;
* Class representing a Raft group leader and its term.
*/
public class LeaderWithTerm {
+ /** The instance determines a state where the leader is undefined. */
+ public static LeaderWithTerm NO_LEADER = new LeaderWithTerm(null, -1);
+
@Nullable
private final Peer leader;
private final long term;
- public LeaderWithTerm(@Nullable Peer leader, long term) {
+ public LeaderWithTerm(Peer leader, long term) {
this.leader = leader;
this.term = term;
}
+ /**
+ * Checks if there is any useful information.
+ *
+ * @return True if the instance does not contain useful data, false otherwise.
+ */
+ public boolean isEmpty() {
+ return leader == null;
+ }
+
public @Nullable Peer leader() {
return leader;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 1441ca84d7..2fc35559b7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -244,6 +244,10 @@ public class RaftGroupServiceImpl implements RaftGroupService {
return this.<GetLeaderResponse>sendWithRetry(randomNode(), requestFactory)
.thenApply(resp -> {
+ if (resp.leaderId() == null) {
+ return LeaderWithTerm.NO_LEADER;
+ }
+
Peer respLeader = parsePeer(resp.leaderId());
this.leader = respLeader;
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 8225dbbcf7..d9ba43cd5f 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -140,7 +140,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
if (subscribed) {
return refreshAndGetLeaderWithTerm()
.thenAcceptAsync(leaderWithTerm -> {
- if (leaderWithTerm.leader() != null
+ if (!leaderWithTerm.isEmpty()
&& appearedNode.name().equals(leaderWithTerm.leader().consistentId())) {
serverEventHandler.onLeaderElected(appearedNode, leaderWithTerm.term());
}
@@ -310,7 +310,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
}
refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
- if (leaderWithTerm.leader() != null) {
+ if (!leaderWithTerm.isEmpty()) {
serverEventHandler.onLeaderElected(
clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()),
leaderWithTerm.term()
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0e1bbc4f21..fde9dd7aeb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -26,6 +26,7 @@ import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
@@ -45,6 +46,7 @@ import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFu
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.file.Path;
@@ -71,7 +73,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -121,6 +123,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
@@ -1509,7 +1512,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
@Override
public CompletableFuture<Table> tableAsync(String name) {
return tableAsyncInternal(IgniteNameUtils.parseSimpleName(name))
- .thenApply(Function.identity());
+ .thenApply(identity());
}
/**
@@ -1766,7 +1769,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
busyLock.leaveBusy();
}
})
- .thenCompose(Function.identity());
+ .thenCompose(identity());
}
private CompletableFuture<Void> handleChangePendingAssignmentEvent(
@@ -1831,8 +1834,21 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
RaftGroupService partGrpSvc = table.internalTable().partitionRaftGroupService(partId);
return partGrpSvc.refreshAndGetLeaderWithTerm()
+ .exceptionally(throwable -> {
+ if (throwable instanceof TimeoutException) {
+ LOG.info("Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", replicaGrpId);
+
+ return LeaderWithTerm.NO_LEADER;
+ }
+
+ throw new IgniteInternalException(
+ INTERNAL_ERR,
+ "Failed to get a leader for the RAFT replication group [get=" + replicaGrpId + "].",
+ throwable
+ );
+ })
.thenCompose(leaderWithTerm -> {
- if (!isLocalPeer(leaderWithTerm.leader())) {
+ if (leaderWithTerm.isEmpty() || !isLocalPeer(leaderWithTerm.leader())) {
return nullCompletedFuture();
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 6401d2977a..7e1c403ff3 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -145,7 +145,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
RaftGroupService mockRaftClient = mock(RaftGroupService.class);
when(mockRaftClient.refreshAndGetLeaderWithTerm())
- .thenAnswer(invocationOnMock -> completedFuture(new LeaderWithTerm(null, 1L)));
+ .thenAnswer(invocationOnMock -> completedFuture(LeaderWithTerm.NO_LEADER));
when(mockRaftClient.run(any()))
.thenAnswer(invocationOnMock -> nullCompletedFuture());