You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2022/05/20 10:43:53 UTC

[ratis] branch master updated: RATIS-1584. Check if the group is empty in RaftClientImpl. (#648)

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new afd33f77 RATIS-1584. Check if the group is empty in RaftClientImpl. (#648)
afd33f77 is described below

commit afd33f77c5e080022ff86725f66c66ae4d9e8d06
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri May 20 18:43:47 2022 +0800

    RATIS-1584. Check if the group is empty in RaftClientImpl. (#648)
---
 .../org/apache/ratis/client/impl/OrderedAsync.java |  4 +-
 .../apache/ratis/client/impl/RaftClientImpl.java   | 50 ++++++++++++++--------
 .../test/java/org/apache/ratis/RaftAsyncTests.java |  5 ++-
 .../ratis/server/impl/GroupManagementBaseTest.java |  2 +-
 4 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 8a530d9b..5bd85698 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -272,7 +272,7 @@ public final class OrderedAsync {
   }
 
   void assertRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {
-    Preconditions.assertTrue(requestSemaphore.availablePermits() == expectedAvailablePermits);
-    Preconditions.assertTrue(requestSemaphore.getQueueLength() == expectedQueueLength);
+    Preconditions.assertSame(expectedAvailablePermits, requestSemaphore.availablePermits(), "availablePermits");
+    Preconditions.assertSame(expectedQueueLength, requestSemaphore.getQueueLength(), "queueLength");
   }
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 75871bf9..7313c914 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -46,6 +46,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutScheduler;
 
@@ -121,6 +122,7 @@ public final class RaftClientImpl implements RaftClient {
     }
 
     void set(Collection<RaftPeer> newPeers) {
+      Preconditions.assertTrue(!newPeers.isEmpty(), "newPeers is empty.");
       list.set(Collections.unmodifiableList(new ArrayList<>(newPeers)));
     }
   }
@@ -154,13 +156,8 @@ public final class RaftClientImpl implements RaftClient {
     this.peers.set(group.getPeers());
     this.groupId = group.getGroupId();
 
-    if (leaderId == null) {
-      final RaftPeerId cached = LEADER_CACHE.getIfPresent(groupId);
-      if (cached != null && group.getPeer(cached) != null) {
-        leaderId = cached;
-      }
-    }
-    this.leaderId = leaderId != null? leaderId : getHighestPriorityPeerId();
+    this.leaderId = Objects.requireNonNull(computeLeaderId(leaderId, group),
+        () -> "this.leaderId is set to null, leaderId=" + leaderId + ", group=" + group);
     this.retryPolicy = Objects.requireNonNull(retryPolicy, "retry policy can't be null");
 
     clientRpc.addRaftPeers(group.getPeers());
@@ -180,6 +177,7 @@ public final class RaftClientImpl implements RaftClient {
     this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this));
   }
 
+  @Override
   public RaftPeerId getLeaderId() {
     return leaderId;
   }
@@ -188,17 +186,31 @@ public final class RaftClientImpl implements RaftClient {
     return groupId;
   }
 
-  private RaftPeerId getHighestPriorityPeerId() {
-    int maxPriority = Integer.MIN_VALUE;
-    RaftPeerId highestPriorityPeerId = null;
-    for (RaftPeer peer : peers) {
-      if (maxPriority < peer.getPriority()) {
-        maxPriority = peer.getPriority();
-        highestPriorityPeerId = peer.getId();
-      }
+  private static RaftPeerId computeLeaderId(RaftPeerId leaderId, RaftGroup group) {
+    if (leaderId != null) {
+      return leaderId;
+    }
+    final RaftPeerId cached = LEADER_CACHE.getIfPresent(group.getGroupId());
+    if (cached != null && group.getPeer(cached) != null) {
+      return cached;
+    }
+    return getHighestPriorityPeer(group).getId();
+  }
+
+  private static RaftPeer getHighestPriorityPeer(RaftGroup group) {
+    final Iterator<RaftPeer> i = group.getPeers().iterator();
+    if (!i.hasNext()) {
+      throw new IllegalArgumentException("Group peers is empty in " + group);
     }
 
-    return highestPriorityPeerId;
+    RaftPeer highest = i.next();
+    for(; i.hasNext(); ) {
+      final RaftPeer peer = i.next();
+      if (peer.getPriority() > highest.getPriority()) {
+        highest = peer;
+      }
+    }
+    return highest;
   }
 
   @Override
@@ -230,7 +242,7 @@ public final class RaftClientImpl implements RaftClient {
     if (server != null) {
       b.setServerId(server);
     } else {
-      b.setLeaderId(leaderId);
+      b.setLeaderId(getLeaderId());
     }
     return b.setClientId(clientId)
         .setGroupId(groupId)
@@ -365,7 +377,7 @@ public final class RaftClientImpl implements RaftClient {
     }
 
     final RaftPeerId oldLeader = request.getServerId();
-    final RaftPeerId curLeader = leaderId;
+    final RaftPeerId curLeader = getLeaderId();
     final boolean stillLeader = oldLeader.equals(curLeader);
     if (newLeader == null && stillLeader) {
       newLeader = CollectionUtils.random(oldLeader,
@@ -376,7 +388,7 @@ public final class RaftClientImpl implements RaftClient {
     final boolean changeLeader = newLeader != null && stillLeader;
     final boolean reconnect = changeLeader || clientRpc.shouldReconnect(ioe);
     if (reconnect) {
-      if (changeLeader && oldLeader.equals(leaderId)) {
+      if (changeLeader && oldLeader.equals(getLeaderId())) {
         LOG.debug("{} {}: client change Leader from {} to {} ex={}", groupId,
             clientId, oldLeader, newLeader, ioe.getClass().getName());
         this.leaderId = newLeader;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 598faa3a..691ca90f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -25,6 +25,8 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -87,8 +89,9 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     LOG.info("Running testAsyncConfiguration");
     final RaftProperties properties = new RaftProperties();
     RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties, false);
+    final RaftPeer server = RaftPeer.newBuilder().setId("s0").build();
     RaftClient.Builder clientBuilder = RaftClient.newBuilder()
-        .setRaftGroup(RaftGroup.emptyGroup())
+        .setRaftGroup(RaftGroup.valueOf(RaftGroupId.randomId(), server))
         .setProperties(properties);
     int maxOutstandingRequests = RaftClientConfigKeys.Async.OUTSTANDING_REQUESTS_MAX_DEFAULT;
     try(RaftClient client = clientBuilder.build()) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 7fcd6fcb..d15b3265 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -276,7 +276,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
 
       LOG.info(i + ") starting " + groups[i]);
       for(RaftPeer p : peers) {
-        try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) {
+        try(final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
           client.getGroupManagementApi(p.getId()).add(groups[i]);
         }
       }