You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/08/11 05:43:02 UTC
incubator-ratis git commit: RATIS-97. Pass RaftGroup to reinitialize.
Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 812d7dbeb -> 0235de0ec
RATIS-97. Pass RaftGroup to reinitialize. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0235de0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0235de0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0235de0e
Branch: refs/heads/master
Commit: 0235de0ececb0c647858ede3228d25f4928df712
Parents: 812d7db
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Aug 10 22:42:43 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Aug 10 22:42:43 2017 -0700
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 4 ++--
.../ratis/client/impl/ClientProtoUtils.java | 6 ++---
.../ratis/client/impl/RaftClientImpl.java | 17 +++++++------
.../org/apache/ratis/protocol/RaftGroup.java | 10 +++++++-
.../ratis/protocol/ReinitializeRequest.java | 10 ++++----
.../java/org/apache/ratis/util/NetUtils.java | 6 ++++-
.../java/org/apache/ratis/util/ProtoUtils.java | 15 ++++++++----
ratis-proto-shaded/src/main/proto/Raft.proto | 2 +-
.../org/apache/ratis/server/RaftServer.java | 2 +-
.../ratis/server/impl/RaftServerProxy.java | 4 ++--
.../server/impl/ReinitializationBaseTest.java | 25 ++++++++++----------
11 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 1a7faf6..44fc186 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -51,8 +51,8 @@ public interface RaftClient extends Closeable {
/** Send set configuration request to the raft service. */
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
- /** Send reinitialize request to the service. */
- RaftClientReply reinitialize(RaftPeer[] serversInNewConf, RaftPeerId server) throws IOException;
+ /** Send reinitialize request to the given server (not the raft service). */
+ RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) throws IOException;
/** @return a {@link Builder}. */
static Builder newBuilder() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 2968884..bfb25be 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -212,19 +212,19 @@ public class ClientProtoUtils {
public static ReinitializeRequest toReinitializeRequest(
ReinitializeRequestProto p) {
final RaftRpcRequestProto m = p.getRpcRequest();
- final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
return new ReinitializeRequest(
new ClientId(m.getRequestorId()),
RaftPeerId.valueOf(m.getReplyId()),
ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
- p.getRpcRequest().getCallId(), peers);
+ m.getCallId(),
+ ProtoUtils.toRaftGroup(p.getGroup()));
}
public static ReinitializeRequestProto toReinitializeRequestProto(
ReinitializeRequest request) {
return ReinitializeRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request))
- .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers()))
+ .setGroup(ProtoUtils.toRaftGroupProtoBuilder(request.getGroup()))
.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index c7ad935..69483ef 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.shaded.com.google.common.base.Predicates;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.TimeDuration;
@@ -29,9 +30,11 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
@@ -89,23 +92,23 @@ final class RaftClientImpl implements RaftClient {
throws IOException {
final long callId = nextCallId();
// also refresh the rpc proxies for these peers
- addServers(peersInNewConf);
+ addServers(Arrays.stream(peersInNewConf));
return sendRequestWithRetry(() -> new SetConfigurationRequest(
clientId, leaderId, groupId, callId, peersInNewConf));
}
@Override
- public RaftClientReply reinitialize(RaftPeer[] peersInNewConf, RaftPeerId server)
+ public RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server)
throws IOException {
final long callId = nextCallId();
- addServers(peersInNewConf);
+ addServers(newGroup.getPeers().stream());
return sendRequest(new ReinitializeRequest(
- clientId, server, groupId, callId, peersInNewConf));
+ clientId, server, groupId, callId, newGroup));
}
- private void addServers(RaftPeer[] peersInNewConf) {
- clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
- .collect(Collectors.toList()));
+ private void addServers(Stream<RaftPeer> peersInNewConf) {
+ clientRpc.addServers(
+ peersInNewConf.filter(p -> !peers.contains(p))::iterator);
}
private RaftClientReply sendRequestWithRetry(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index 6e870b1..d00bed5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -34,10 +34,18 @@ public class RaftGroup {
/** The group of raft peers */
private final List<RaftPeer> peers;
+ public RaftGroup(RaftGroupId groupId) {
+ this(groupId, Collections.emptyList());
+ }
+
public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) {
+ this(groupId, Arrays.asList(peers));
+ }
+
+ public RaftGroup(RaftGroupId groupId, List<RaftPeer> peers) {
Preconditions.assertTrue(peers != null);
this.groupId = groupId;
- this.peers = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(peers)));
+ this.peers = Collections.unmodifiableList(new ArrayList<>(peers));
}
public RaftGroupId getGroupId() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
index b69c845..3c6d468 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -17,23 +17,21 @@
*/
package org.apache.ratis.protocol;
-import java.util.Arrays;
-
public class ReinitializeRequest extends RaftClientRequest {
private final RaftGroup group;
public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, RaftPeer[] peers) {
+ RaftGroupId groupId, long callId, RaftGroup group) {
super(clientId, serverId, groupId, callId, null);
- this.group = new RaftGroup(groupId, peers);
+ this.group = group;
}
- public RaftGroup getPeersInGroup() {
+ public RaftGroup getGroup() {
return group;
}
@Override
public String toString() {
- return super.toString() + ", peers:" + Arrays.asList(getPeersInGroup());
+ return super.toString() + ", " + getGroup();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
index 86dd865..9d97f4d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
@@ -123,6 +123,10 @@ public interface NetUtils {
}
static String address2String(InetSocketAddress address) {
- return address.getHostName() + ":" + address.getPort();
+ final StringBuilder b = new StringBuilder(address.getHostName());
+ if (address.getAddress() instanceof Inet6Address) {
+ b.insert(0, '[').append(']');
+ }
+ return b.append(':').append(address.getPort()).toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 9a2d530..7e849fd 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,10 +17,7 @@
*/
package org.apache.ratis.util;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
import org.apache.ratis.shaded.proto.RaftProtos.*;
@@ -111,6 +108,16 @@ public class ProtoUtils {
return RaftGroupIdProto.newBuilder().setId(id.toByteString());
}
+ public static RaftGroup toRaftGroup(RaftGroupProto proto) {
+ return new RaftGroup(toRaftGroupId(proto.getGroupId()),
+ toRaftPeerArray(proto.getPeersList()));
+ }
+
+ public static RaftGroupProto.Builder toRaftGroupProtoBuilder(RaftGroup group) {
+ return RaftGroupProto.newBuilder()
+ .setGroupId(toRaftGroupIdProtoBuilder(group.getGroupId()))
+ .addAllPeers(toRaftPeerProtos(group.getPeers()));
+ }
public static boolean isConfigurationLogEntry(LogEntryProto entry) {
return entry.getLogEntryBodyCase() ==
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index ed145df..4dada3b 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -195,5 +195,5 @@ message SetConfigurationRequestProto {
// reinitialize request
message ReinitializeRequestProto {
RaftRpcRequestProto rpcRequest = 1;
- repeated RaftPeerProto peers = 2;
+ RaftGroupProto group = 2; // the target group.
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index d5a46f9..622e75a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -88,7 +88,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
}
/** Set all the peers (including the server being built) in the Raft cluster. */
- public Builder setPeers(RaftGroup group) {
+ public Builder setGroup(RaftGroup group) {
this.group = group;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 4b3b9f5..9be1e9a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -169,7 +169,7 @@ public class RaftServerProxy implements RaftServer {
final RaftServerImpl newImpl;
try {
- newImpl = initImpl(request.getPeersInGroup());
+ newImpl = initImpl(request.getGroup());
} catch (IOException ioe) {
final RaftException re = new RaftException(
"Failed to reinitialize, request=" + request, ioe);
@@ -178,7 +178,7 @@ public class RaftServerProxy implements RaftServer {
return new RaftClientReply(request, re);
}
- getServerRpc().addPeers(request.getPeersInGroup().getPeers());
+ getServerRpc().addPeers(request.getGroup().getPeers());
newImpl.start();
impl.complete(newImpl);
return new RaftClientReply(request, (Message) null);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0235de0e/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
index 20b8680..a4ac287 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -66,7 +66,7 @@ public abstract class ReinitializationBaseTest {
// Start server with an empty conf
final RaftGroupId groupId = RaftGroupId.createId();
- final RaftGroup group = new RaftGroup(groupId, RaftPeer.EMPTY_PEERS);
+ final RaftGroup group = new RaftGroup(groupId);
final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
.map(RaftPeerId::valueOf).collect(Collectors.toList());
@@ -81,11 +81,10 @@ public abstract class ReinitializationBaseTest {
Assert.assertNull(cluster.getLeader());
// Reinitialize servers
- final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS);
- for(RaftPeer p : peers) {
- final RaftClient client = cluster.createClient(p.getId(),
- new RaftGroup(groupId, new RaftPeer[]{p}));
- client.reinitialize(peers, p.getId());
+ final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers());
+ final RaftClient client = cluster.createClient(null, newGroup);
+ for(RaftPeer p : newGroup.getPeers()) {
+ client.reinitialize(newGroup, p.getId());
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
cluster.shutdown();
@@ -106,7 +105,7 @@ public abstract class ReinitializationBaseTest {
@Test
public void testReinitialize9Nodes() throws Exception {
final int[] idIndex = {5, 8, 9};
- runTestReinitializeMultiGroups(idIndex, 0);
+ runTestReinitializeMultiGroups(idIndex, 2);
}
private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception {
@@ -121,7 +120,7 @@ public abstract class ReinitializationBaseTest {
LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers());
// Start server with an empty conf
- final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId(), RaftPeer.EMPTY_PEERS);
+ final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId());
final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
.map(RaftPeerId::valueOf).collect(Collectors.toList());
@@ -147,8 +146,8 @@ public abstract class ReinitializationBaseTest {
LOG.info(i + ") starting " + groups[i]);
for(RaftPeer p : peers) {
- try(final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
- client.reinitialize(peers, p.getId());
+ try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) {
+ client.reinitialize(groups[i], p.getId());
}
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
@@ -162,10 +161,11 @@ public abstract class ReinitializationBaseTest {
for (int i = 0; i < groups.length; i++) {
if (i != chosen) {
final RaftGroup g = groups[i];
+ final RaftGroup newGroup = new RaftGroup(g.getGroupId());
LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
for(RaftPeer p : g.getPeers()) {
try (final RaftClient client = cluster.createClient(p.getId(), g)) {
- client.reinitialize(RaftPeer.EMPTY_PEERS, p.getId());
+ client.reinitialize(newGroup, p.getId());
}
}
}
@@ -174,6 +174,7 @@ public abstract class ReinitializationBaseTest {
LOG.info("close groups: " + cluster.printServers());
// update chosen group to use all the peers
+ final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId());
final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS);
for(int i = 0; i < groups.length; i++) {
LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId()));
@@ -184,7 +185,7 @@ public abstract class ReinitializationBaseTest {
} else {
for(RaftPeer p : groups[i].getPeers()) {
try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
- client.reinitialize(array, p.getId());
+ client.reinitialize(newGroup, p.getId());
}
}
}