You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/08/11 05:43:02 UTC

incubator-ratis git commit: RATIS-97. Pass RaftGroup to reinitialize. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 812d7dbeb -> 0235de0ec


RATIS-97. Pass RaftGroup to reinitialize. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0235de0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0235de0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0235de0e

Branch: refs/heads/master
Commit: 0235de0ececb0c647858ede3228d25f4928df712
Parents: 812d7db
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Aug 10 22:42:43 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Aug 10 22:42:43 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  4 ++--
 .../ratis/client/impl/ClientProtoUtils.java     |  6 ++---
 .../ratis/client/impl/RaftClientImpl.java       | 17 +++++++------
 .../org/apache/ratis/protocol/RaftGroup.java    | 10 +++++++-
 .../ratis/protocol/ReinitializeRequest.java     | 10 ++++----
 .../java/org/apache/ratis/util/NetUtils.java    |  6 ++++-
 .../java/org/apache/ratis/util/ProtoUtils.java  | 15 ++++++++----
 ratis-proto-shaded/src/main/proto/Raft.proto    |  2 +-
 .../org/apache/ratis/server/RaftServer.java     |  2 +-
 .../ratis/server/impl/RaftServerProxy.java      |  4 ++--
 .../server/impl/ReinitializationBaseTest.java   | 25 ++++++++++----------
 11 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 1a7faf6..44fc186 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -51,8 +51,8 @@ public interface RaftClient extends Closeable {
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
 
-  /** Send reinitialize request to the service. */
-  RaftClientReply reinitialize(RaftPeer[] serversInNewConf, RaftPeerId server) throws IOException;
+  /** Send reinitialize request to the given server (not the raft service). */
+  RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) throws IOException;
 
   /** @return a {@link Builder}. */
   static Builder newBuilder() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 2968884..bfb25be 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -212,19 +212,19 @@ public class ClientProtoUtils {
   public static ReinitializeRequest toReinitializeRequest(
       ReinitializeRequestProto p) {
     final RaftRpcRequestProto m = p.getRpcRequest();
-    final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
     return new ReinitializeRequest(
         new ClientId(m.getRequestorId()),
         RaftPeerId.valueOf(m.getReplyId()),
         ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
-        p.getRpcRequest().getCallId(), peers);
+        m.getCallId(),
+        ProtoUtils.toRaftGroup(p.getGroup()));
   }
 
   public static ReinitializeRequestProto toReinitializeRequestProto(
       ReinitializeRequest request) {
     return ReinitializeRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
-        .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers()))
+        .setGroup(ProtoUtils.toRaftGroupProtoBuilder(request.getGroup()))
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index c7ad935..69483ef 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.shaded.com.google.common.base.Predicates;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -29,9 +30,11 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
@@ -89,23 +92,23 @@ final class RaftClientImpl implements RaftClient {
       throws IOException {
     final long callId = nextCallId();
     // also refresh the rpc proxies for these peers
-    addServers(peersInNewConf);
+    addServers(Arrays.stream(peersInNewConf));
     return sendRequestWithRetry(() -> new SetConfigurationRequest(
         clientId, leaderId, groupId, callId, peersInNewConf));
   }
 
   @Override
-  public RaftClientReply reinitialize(RaftPeer[] peersInNewConf, RaftPeerId server)
+  public RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server)
       throws IOException {
     final long callId = nextCallId();
-    addServers(peersInNewConf);
+    addServers(newGroup.getPeers().stream());
     return sendRequest(new ReinitializeRequest(
-        clientId, server, groupId, callId, peersInNewConf));
+        clientId, server, groupId, callId, newGroup));
   }
 
-  private void addServers(RaftPeer[] peersInNewConf) {
-    clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
-        .collect(Collectors.toList()));
+  private void addServers(Stream<RaftPeer> peersInNewConf) {
+    clientRpc.addServers(
+        peersInNewConf.filter(p -> !peers.contains(p))::iterator);
   }
 
   private RaftClientReply sendRequestWithRetry(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index 6e870b1..d00bed5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -34,10 +34,18 @@ public class RaftGroup {
   /** The group of raft peers */
   private final List<RaftPeer> peers;
 
+  public RaftGroup(RaftGroupId groupId) {
+    this(groupId, Collections.emptyList());
+  }
+
   public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) {
+    this(groupId, Arrays.asList(peers));
+  }
+
+  public RaftGroup(RaftGroupId groupId, List<RaftPeer> peers) {
     Preconditions.assertTrue(peers != null);
     this.groupId = groupId;
-    this.peers = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(peers)));
+    this.peers = Collections.unmodifiableList(new ArrayList<>(peers));
   }
 
   public RaftGroupId getGroupId() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
index b69c845..3c6d468 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -17,23 +17,21 @@
  */
 package org.apache.ratis.protocol;
 
-import java.util.Arrays;
-
 public class ReinitializeRequest extends RaftClientRequest {
   private final RaftGroup group;
 
   public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId, long callId, RaftPeer[] peers) {
+      RaftGroupId groupId, long callId, RaftGroup group) {
     super(clientId, serverId, groupId, callId, null);
-    this.group = new RaftGroup(groupId, peers);
+    this.group = group;
   }
 
-  public RaftGroup getPeersInGroup() {
+  public RaftGroup getGroup() {
     return group;
   }
 
   @Override
   public String toString() {
-    return super.toString() + ", peers:" + Arrays.asList(getPeersInGroup());
+    return super.toString() + ", " + getGroup();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
index 86dd865..9d97f4d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
@@ -123,6 +123,10 @@ public interface NetUtils {
   }
 
   static String address2String(InetSocketAddress address) {
-    return address.getHostName() + ":" + address.getPort();
+    final StringBuilder b = new StringBuilder(address.getHostName());
+    if (address.getAddress() instanceof Inet6Address) {
+      b.insert(0, '[').append(']');
+    }
+    return b.append(':').append(address.getPort()).toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 9a2d530..7e849fd 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,10 +17,7 @@
  */
 package org.apache.ratis.util;
 
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
@@ -111,6 +108,16 @@ public class ProtoUtils {
     return RaftGroupIdProto.newBuilder().setId(id.toByteString());
   }
 
+  public static RaftGroup toRaftGroup(RaftGroupProto proto) {
+    return new RaftGroup(toRaftGroupId(proto.getGroupId()),
+        toRaftPeerArray(proto.getPeersList()));
+  }
+
+  public static RaftGroupProto.Builder toRaftGroupProtoBuilder(RaftGroup group) {
+    return RaftGroupProto.newBuilder()
+        .setGroupId(toRaftGroupIdProtoBuilder(group.getGroupId()))
+        .addAllPeers(toRaftPeerProtos(group.getPeers()));
+  }
 
   public static boolean isConfigurationLogEntry(LogEntryProto entry) {
     return entry.getLogEntryBodyCase() ==

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index ed145df..4dada3b 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -195,5 +195,5 @@ message SetConfigurationRequestProto {
 // reinitialize request
 message ReinitializeRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
-  repeated RaftPeerProto peers = 2;
+  RaftGroupProto group = 2; // the target group.
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index d5a46f9..622e75a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -88,7 +88,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
     }
 
     /** Set all the peers (including the server being built) in the Raft cluster. */
-    public Builder setPeers(RaftGroup group) {
+    public Builder setGroup(RaftGroup group) {
       this.group = group;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 4b3b9f5..9be1e9a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -169,7 +169,7 @@ public class RaftServerProxy implements RaftServer {
 
         final RaftServerImpl newImpl;
         try {
-          newImpl = initImpl(request.getPeersInGroup());
+          newImpl = initImpl(request.getGroup());
         } catch (IOException ioe) {
           final RaftException re = new RaftException(
               "Failed to reinitialize, request=" + request, ioe);
@@ -178,7 +178,7 @@ public class RaftServerProxy implements RaftServer {
           return new RaftClientReply(request, re);
         }
 
-        getServerRpc().addPeers(request.getPeersInGroup().getPeers());
+        getServerRpc().addPeers(request.getGroup().getPeers());
         newImpl.start();
         impl.complete(newImpl);
         return new RaftClientReply(request, (Message) null);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
index 20b8680..a4ac287 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -66,7 +66,7 @@ public abstract class ReinitializationBaseTest {
 
     // Start server with an empty conf
     final RaftGroupId groupId = RaftGroupId.createId();
-    final RaftGroup group = new RaftGroup(groupId, RaftPeer.EMPTY_PEERS);
+    final RaftGroup group = new RaftGroup(groupId);
 
     final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
         .map(RaftPeerId::valueOf).collect(Collectors.toList());
@@ -81,11 +81,10 @@ public abstract class ReinitializationBaseTest {
     Assert.assertNull(cluster.getLeader());
 
     // Reinitialize servers
-    final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS);
-    for(RaftPeer p : peers) {
-      final RaftClient client = cluster.createClient(p.getId(),
-          new RaftGroup(groupId, new RaftPeer[]{p}));
-      client.reinitialize(peers, p.getId());
+    final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers());
+    final RaftClient client = cluster.createClient(null, newGroup);
+    for(RaftPeer p : newGroup.getPeers()) {
+      client.reinitialize(newGroup, p.getId());
     }
     Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
     cluster.shutdown();
@@ -106,7 +105,7 @@ public abstract class ReinitializationBaseTest {
   @Test
   public void testReinitialize9Nodes() throws Exception {
     final int[] idIndex = {5, 8, 9};
-    runTestReinitializeMultiGroups(idIndex, 0);
+    runTestReinitializeMultiGroups(idIndex, 2);
   }
 
   private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception {
@@ -121,7 +120,7 @@ public abstract class ReinitializationBaseTest {
     LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers());
 
     // Start server with an empty conf
-    final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId(), RaftPeer.EMPTY_PEERS);
+    final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId());
 
     final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
         .map(RaftPeerId::valueOf).collect(Collectors.toList());
@@ -147,8 +146,8 @@ public abstract class ReinitializationBaseTest {
 
       LOG.info(i + ") starting " + groups[i]);
       for(RaftPeer p : peers) {
-        try(final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
-          client.reinitialize(peers, p.getId());
+        try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) {
+          client.reinitialize(groups[i], p.getId());
         }
       }
       Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
@@ -162,10 +161,11 @@ public abstract class ReinitializationBaseTest {
     for (int i = 0; i < groups.length; i++) {
       if (i != chosen) {
         final RaftGroup g = groups[i];
+        final RaftGroup newGroup = new RaftGroup(g.getGroupId());
         LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
         for(RaftPeer p : g.getPeers()) {
           try (final RaftClient client = cluster.createClient(p.getId(), g)) {
-            client.reinitialize(RaftPeer.EMPTY_PEERS, p.getId());
+            client.reinitialize(newGroup, p.getId());
           }
         }
       }
@@ -174,6 +174,7 @@ public abstract class ReinitializationBaseTest {
     LOG.info("close groups: " + cluster.printServers());
 
     // update chosen group to use all the peers
+    final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId());
     final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS);
     for(int i = 0; i < groups.length; i++) {
       LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId()));
@@ -184,7 +185,7 @@ public abstract class ReinitializationBaseTest {
       } else {
         for(RaftPeer p : groups[i].getPeers()) {
           try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
-            client.reinitialize(array, p.getId());
+            client.reinitialize(newGroup, p.getId());
           }
         }
       }