You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/01/23 09:08:53 UTC
[incubator-ratis] branch master updated: RATIS-1290. Allow separate
admin and client service (#401). Contributed by Attila Doroszlai
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new eb66796 RATIS-1290. Allow separate admin and client service (#401). Contributed by Attila Doroszlai
eb66796 is described below
commit eb66796de745d3c70bc18dc707222af53a84ea5b
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Sat Jan 23 10:08:46 2021 +0100
RATIS-1290. Allow separate admin and client service (#401). Contributed by Attila Doroszlai
---
.../java/org/apache/ratis/protocol/RaftPeer.java | 64 +++++++-
.../main/java/org/apache/ratis/util/NetUtils.java | 25 ++++
.../java/org/apache/ratis/util/ProtoUtils.java | 2 +
.../src/test/java/org/apache/ratis/BaseTest.java | 2 +-
.../java/org/apache/ratis/util/TestNetUtils.java | 37 +++++
.../ratis/examples/arithmetic/cli/Server.java | 7 +
.../ratis/examples/common/SubCommandBase.java | 14 +-
.../ratis/examples/filestore/cli/Server.java | 7 +
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 57 +++++++
.../java/org/apache/ratis/grpc/GrpcFactory.java | 37 ++++-
.../grpc/client/GrpcClientProtocolClient.java | 54 ++++---
.../ratis/grpc/client/GrpcClientProtocolProxy.java | 2 +-
.../apache/ratis/grpc/client/GrpcClientRpc.java | 7 +-
.../org/apache/ratis/grpc/server/GrpcService.java | 163 ++++++++++++++++-----
.../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 8 +
ratis-proto/src/main/proto/Raft.proto | 2 +
.../org/apache/ratis/server/RaftServerRpc.java | 11 +-
.../apache/ratis/server/impl/RaftServerProxy.java | 2 +
.../apache/ratis/server/impl/MiniRaftCluster.java | 18 ++-
.../server/impl/RaftReconfigurationBaseTest.java | 2 +-
20 files changed, 444 insertions(+), 77 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index d0a2d5a..e35efa8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -56,9 +56,23 @@ public final class RaftPeer {
return new Builder();
}
+ /** @return a new builder initialized from {@code peer} */
+ public static Builder newBuilder(RaftPeer peer) {
+ Objects.requireNonNull(peer, "peer == null");
+ return newBuilder()
+ .setId(peer.getId())
+ .setAddress(peer.getAddress())
+ .setAdminAddress(peer.getAdminAddress())
+ .setClientAddress(peer.getClientAddress())
+ .setDataStreamAddress(peer.getDataStreamAddress())
+ .setPriority(peer.getPriority());
+ }
+
public static class Builder {
private RaftPeerId id;
private String address;
+ private String adminAddress;
+ private String clientAddress;
private String dataStreamAddress;
private int priority;
@@ -84,6 +98,24 @@ public final class RaftPeer {
return setAddress(NetUtils.address2String(address));
}
+ public Builder setAdminAddress(String addr) {
+ this.adminAddress = addr;
+ return this;
+ }
+
+ public Builder setAdminAddress(InetSocketAddress addr) {
+ return setAdminAddress(NetUtils.address2String(addr));
+ }
+
+ public Builder setClientAddress(String addr) {
+ this.clientAddress = addr;
+ return this;
+ }
+
+ public Builder setClientAddress(InetSocketAddress addr) {
+ return setClientAddress(NetUtils.address2String(addr));
+ }
+
public Builder setDataStreamAddress(String dataStreamAddress) {
this.dataStreamAddress = dataStreamAddress;
return this;
@@ -104,7 +136,7 @@ public final class RaftPeer {
public RaftPeer build() {
return new RaftPeer(
Objects.requireNonNull(id, "The 'id' field is not initialized."),
- address, dataStreamAddress, priority);
+ address, adminAddress, clientAddress, dataStreamAddress, priority);
}
}
@@ -112,6 +144,8 @@ public final class RaftPeer {
private final RaftPeerId id;
/** The RPC address of the peer. */
private final String address;
+ private final String adminAddress;
+ private final String clientAddress;
/** The DataStream address of the peer. */
private final String dataStreamAddress;
/** The priority of the peer. */
@@ -119,10 +153,14 @@ public final class RaftPeer {
private final Supplier<RaftPeerProto> raftPeerProto;
- private RaftPeer(RaftPeerId id, String address, String dataStreamAddress, int priority) {
+ private RaftPeer(RaftPeerId id,
+ String address, String adminAddress, String clientAddress, String dataStreamAddress,
+ int priority) {
this.id = Objects.requireNonNull(id, "id == null");
this.address = address;
this.dataStreamAddress = dataStreamAddress;
+ this.adminAddress = adminAddress;
+ this.clientAddress = clientAddress;
this.priority = priority;
this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto);
}
@@ -132,6 +170,8 @@ public final class RaftPeer {
.setId(getId().toByteString());
Optional.ofNullable(getAddress()).ifPresent(builder::setAddress);
Optional.ofNullable(getDataStreamAddress()).ifPresent(builder::setDataStreamAddress);
+ Optional.ofNullable(getClientAddress()).ifPresent(builder::setClientAddress);
+ Optional.ofNullable(getAdminAddress()).ifPresent(builder::setAdminAddress);
builder.setPriority(priority);
return builder.build();
}
@@ -141,11 +181,23 @@ public final class RaftPeer {
return id;
}
- /** @return The RPC address of the peer. */
+ /** @return The RPC address of the peer for server-server communication. */
public String getAddress() {
return address;
}
+ /** @return The RPC address of the peer for admin operations.
+ * May be {@code null}, in which case {@link #getAddress()} should be used. */
+ public String getAdminAddress() {
+ return adminAddress;
+ }
+
+ /** @return The RPC address of the peer for client operations.
+ * May be {@code null}, in which case {@link #getAddress()} should be used. */
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
/** @return The data stream address of the peer. */
public String getDataStreamAddress() {
return dataStreamAddress;
@@ -163,9 +215,13 @@ public final class RaftPeer {
@Override
public String toString() {
final String rpc = address != null? "|rpc:" + address: "";
+ final String admin = adminAddress != null && !Objects.equals(address, adminAddress)
+ ? "|admin:" + adminAddress : "";
+ final String client = clientAddress != null && !Objects.equals(address, clientAddress)
+ ? "|client:" + clientAddress : "";
final String data = dataStreamAddress != null? "|dataStream:" + dataStreamAddress: "";
final String p = "|priority:" + priority;
- return id + rpc + data + p;
+ return id + rpc + admin + client + data + p;
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
index 6fe9802..10d2fb4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
@@ -20,8 +20,12 @@ package org.apache.ratis.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -112,6 +116,27 @@ public interface NetUtils {
return addr;
}
+ /** Creates {@code count} unique local addresses. They may conflict with
+ * addresses created later, but not with one another. */
+ static List<InetSocketAddress> createLocalServerAddress(int count) {
+ List<InetSocketAddress> list = new ArrayList<>(count);
+ List<ServerSocket> sockets = new ArrayList<>(count);
+ try {
+ for (int i = 0; i < count; i++) {
+ ServerSocket s = new ServerSocket();
+ sockets.add(s);
+ s.setReuseAddress(true);
+ s.bind(null);
+ list.add((InetSocketAddress) s.getLocalSocketAddress());
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } finally {
+ IOUtils.cleanup(null, sockets.toArray(new Closeable[0]));
+ }
+ return list;
+ }
+
static InetSocketAddress createLocalServerAddress() {
try(ServerSocket s = new ServerSocket()) {
s.setReuseAddress(true);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index d36141c..4fff8c1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -117,6 +117,8 @@ public interface ProtoUtils {
.setId(RaftPeerId.valueOf(p.getId()))
.setAddress(p.getAddress())
.setDataStreamAddress(p.getDataStreamAddress())
+ .setClientAddress(p.getClientAddress())
+ .setAdminAddress(p.getAdminAddress())
.setPriority(p.getPriority())
.build();
}
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index c77b01f..68c8015 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -74,7 +74,7 @@ public abstract class BaseTest {
RaftPeer peer = peers.get(i);
final int priority = peer.equals(suggestedLeader)? 2: 1;
peersWithPriority.add(
- RaftPeer.newBuilder().setId(peer.getId()).setAddress(peer.getAddress()).setPriority(priority).build());
+ RaftPeer.newBuilder(peer).setPriority(priority).build());
}
return peersWithPriority;
}
diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestNetUtils.java b/ratis-common/src/test/java/org/apache/ratis/util/TestNetUtils.java
new file mode 100644
index 0000000..8fea1a1
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/util/TestNetUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestNetUtils {
+
+ @Test
+ public void createsUniqueAddresses() {
+ for (int i = 0; i < 10; i++) {
+ List<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(100);
+ assertEquals(addresses.stream().distinct().collect(Collectors.toList()), addresses);
+ }
+ }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
index 3647231..8836b6a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Server.java
@@ -35,6 +35,7 @@ import org.apache.ratis.util.NetUtils;
import java.io.File;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -59,6 +60,12 @@ public class Server extends SubCommandBase {
final int port = NetUtils.createSocketAddr(getPeer(peerId).getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port);
+
+ Optional.ofNullable(getPeer(peerId).getClientAddress()).ifPresent(address ->
+ GrpcConfigKeys.Client.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
+ Optional.ofNullable(getPeer(peerId).getAdminAddress()).ifPresent(address ->
+ GrpcConfigKeys.Admin.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
+
properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
StateMachine stateMachine = new ArithmeticStateMachine();
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index a3263e7..d879691 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -36,8 +36,8 @@ public abstract class SubCommandBase {
private String raftGroupId = "demoRaftGroup123";
@Parameter(names = {"--peers", "-r"}, description =
- "Raft peers (format: name:host:port:dataStreamPort,"
- + "name:host:port)", required = true)
+ "Raft peers (format: name:host:port:dataStreamPort:clientPort:adminPort,"
+ + "...)", required = true)
private String peers;
public static RaftPeer[] parsePeers(String peers) {
@@ -45,8 +45,14 @@ public abstract class SubCommandBase {
String[] addressParts = address.split(":");
RaftPeer.Builder builder = RaftPeer.newBuilder();
builder.setId(addressParts[0]).setAddress(addressParts[1] + ":" + addressParts[2]);
- if (addressParts.length == 4) {
- builder.setDataStreamAddress(addressParts[1] + ":" + addressParts[3]).build();
+ if (addressParts.length >= 4) {
+ builder.setDataStreamAddress(addressParts[1] + ":" + addressParts[3]);
+ if (addressParts.length >= 5) {
+ builder.setClientAddress(addressParts[1] + ":" + addressParts[4]);
+ if (addressParts.length >= 6) {
+ builder.setAdminAddress(addressParts[1] + ":" + addressParts[5]);
+ }
+ }
}
return builder.build();
}).toArray(RaftPeer[]::new);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 7fc928d..e21326d 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -44,6 +44,7 @@ import org.apache.ratis.util.TimeDuration;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -84,6 +85,12 @@ public class Server extends SubCommandBase {
final int port = NetUtils.createSocketAddr(getPeer(peerId).getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port);
+
+ Optional.ofNullable(getPeer(peerId).getClientAddress()).ifPresent(address ->
+ GrpcConfigKeys.Client.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
+ Optional.ofNullable(getPeer(peerId).getAdminAddress()).ifPresent(address ->
+ GrpcConfigKeys.Admin.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
+
String dataStreamAddress = getPeer(peerId).getDataStreamAddress();
if (dataStreamAddress != null) {
final int dataStreamport = NetUtils.createSocketAddr(dataStreamAddress).getPort();
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 3f97523..eca51ad 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -96,6 +96,54 @@ public interface GrpcConfigKeys {
}
}
+ interface Admin {
+ String PREFIX = GrpcConfigKeys.PREFIX + ".admin";
+
+ String PORT_KEY = PREFIX + ".port";
+ int PORT_DEFAULT = -1;
+ static int port(RaftProperties properties) {
+ final int port = getInt(properties::getInt,
+ PORT_KEY, PORT_DEFAULT, getDefaultLog(), requireMin(-1), requireMax(65536));
+ return port != PORT_DEFAULT ? port : Server.port(properties);
+ }
+ static void setPort(RaftProperties properties, int port) {
+ setInt(properties::setInt, PORT_KEY, port);
+ }
+
+ String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
+ Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
+ static GrpcTlsConfig tlsConf(Parameters parameters) {
+ return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
+ }
+ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
+ parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
+ }
+ }
+
+ interface Client {
+ String PREFIX = GrpcConfigKeys.PREFIX + ".client";
+
+ String PORT_KEY = PREFIX + ".port";
+ int PORT_DEFAULT = -1;
+ static int port(RaftProperties properties) {
+ final int port = getInt(properties::getInt,
+ PORT_KEY, PORT_DEFAULT, getDefaultLog(), requireMin(-1), requireMax(65536));
+ return port != PORT_DEFAULT ? port : Server.port(properties);
+ }
+ static void setPort(RaftProperties properties, int port) {
+ setInt(properties::setInt, PORT_KEY, port);
+ }
+
+ String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
+ Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
+ static GrpcTlsConfig tlsConf(Parameters parameters) {
+ return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
+ }
+ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
+ parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
+ }
+ }
+
interface Server {
String PREFIX = GrpcConfigKeys.PREFIX + ".server";
@@ -109,6 +157,15 @@ public interface GrpcConfigKeys {
setInt(properties::setInt, PORT_KEY, port);
}
+ String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
+ Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
+ static GrpcTlsConfig tlsConf(Parameters parameters) {
+ return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
+ }
+ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
+ parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
+ }
+
String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".leader.outstanding.appends.max";
int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 128;
static int leaderOutstandingAppendsMax(RaftProperties properties) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 3f33e44..5da5fc5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -53,6 +53,9 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
}
private final GrpcTlsConfig tlsConfig;
+ private final GrpcTlsConfig adminTlsConfig;
+ private final GrpcTlsConfig clientTlsConfig;
+ private final GrpcTlsConfig serverTlsConfig;
public static Parameters newRaftParameters(GrpcTlsConfig conf) {
final Parameters p = new Parameters();
@@ -61,17 +64,42 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
}
public GrpcFactory(Parameters parameters) {
- this(GrpcConfigKeys.TLS.conf(parameters));
+ this(
+ GrpcConfigKeys.TLS.conf(parameters),
+ GrpcConfigKeys.Admin.tlsConf(parameters),
+ GrpcConfigKeys.Client.tlsConf(parameters),
+ GrpcConfigKeys.Server.tlsConf(parameters)
+ );
}
public GrpcFactory(GrpcTlsConfig tlsConfig) {
+ this(tlsConfig, null, null, null);
+ }
+
+ private GrpcFactory(GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
+ GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
this.tlsConfig = tlsConfig;
+ this.adminTlsConfig = adminTlsConfig;
+ this.clientTlsConfig = clientTlsConfig;
+ this.serverTlsConfig = serverTlsConfig;
}
public GrpcTlsConfig getTlsConfig() {
return tlsConfig;
}
+ public GrpcTlsConfig getAdminTlsConfig() {
+ return adminTlsConfig != null ? adminTlsConfig : tlsConfig;
+ }
+
+ public GrpcTlsConfig getClientTlsConfig() {
+ return clientTlsConfig != null ? clientTlsConfig : tlsConfig;
+ }
+
+ public GrpcTlsConfig getServerTlsConfig() {
+ return serverTlsConfig != null ? serverTlsConfig : tlsConfig;
+ }
+
@Override
public SupportedRpcType getRpcType() {
return SupportedRpcType.GRPC;
@@ -87,13 +115,16 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::info);
return GrpcService.newBuilder()
.setServer(server)
- .setTlsConfig(tlsConfig)
+ .setAdminTlsConfig(getAdminTlsConfig())
+ .setServerTlsConfig(getServerTlsConfig())
+ .setClientTlsConfig(getClientTlsConfig())
.build();
}
@Override
public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) {
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::debug);
- return new GrpcClientRpc(clientId, properties, getTlsConfig());
+ return new GrpcClientRpc(clientId, properties,
+ getAdminTlsConfig(), getClientTlsConfig());
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 61bee8c..dc8def1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -40,7 +40,6 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc;
import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc;
-import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.ClientId;
@@ -65,6 +64,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -79,13 +79,13 @@ public class GrpcClientProtocolClient implements Closeable {
private final Supplier<String> name;
private final RaftPeer target;
- private final ManagedChannel channel;
+ private final ManagedChannel clientChannel;
+ private final ManagedChannel adminChannel;
private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
- private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
@@ -93,15 +93,41 @@ public class GrpcClientProtocolClient implements Closeable {
private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
- GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, GrpcTlsConfig tlsConf) {
+ GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
+ GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
+ final MetricClientInterceptor monitoringInterceptor = new MetricClientInterceptor(getName());
+
+ final String clientAddress = Optional.ofNullable(target.getClientAddress())
+ .filter(x -> !x.isEmpty()).orElse(target.getAddress());
+ final String adminAddress = Optional.ofNullable(target.getAdminAddress())
+ .filter(x -> !x.isEmpty()).orElse(target.getAddress());
+ final boolean separateAdminChannel = !Objects.equals(clientAddress, adminAddress);
+
+ clientChannel = buildChannel(clientAddress, clientTlsConfig,
+ flowControlWindow, maxMessageSize, monitoringInterceptor);
+ adminChannel = separateAdminChannel
+ ? buildChannel(adminAddress, adminTlsConfig,
+ flowControlWindow, maxMessageSize, monitoringInterceptor)
+ : clientChannel;
+
+ asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
+ adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(adminChannel);
+ this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
+ this.watchRequestTimeoutDuration =
+ RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
+ }
+
+ private ManagedChannel buildChannel(String address, GrpcTlsConfig tlsConf,
+ SizeInBytes flowControlWindow, SizeInBytes maxMessageSize,
+ MetricClientInterceptor monitoringInterceptor) {
NettyChannelBuilder channelBuilder =
- NettyChannelBuilder.forTarget(target.getAddress());
+ NettyChannelBuilder.forTarget(address);
- if (tlsConf!= null) {
+ if (tlsConf != null) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if (tlsConf.isFileBasedConfig()) {
sslContextBuilder.trustManager(tlsConf.getTrustStoreFile());
@@ -127,19 +153,10 @@ public class GrpcClientProtocolClient implements Closeable {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- MetricClientInterceptor monitoringInterceptor = new MetricClientInterceptor(getName());
-
- channel = channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
+ return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
.intercept(monitoringInterceptor)
.build();
-
- blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
- asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
- adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
- this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
- this.watchRequestTimeoutDuration =
- RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
}
String getName() {
@@ -150,7 +167,10 @@ public class GrpcClientProtocolClient implements Closeable {
public void close() {
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
- GrpcUtil.shutdownManagedChannel(channel);
+ GrpcUtil.shutdownManagedChannel(clientChannel);
+ if (clientChannel != adminChannel) {
+ GrpcUtil.shutdownManagedChannel(adminChannel);
+ }
scheduler.close();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
index 666b1c3..95119ef 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
@@ -37,7 +37,7 @@ public class GrpcClientProtocolProxy implements Closeable {
public GrpcClientProtocolProxy(ClientId clientId, RaftPeer target,
Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
RaftProperties properties, GrpcTlsConfig tlsConfig) {
- proxy = new GrpcClientProtocolClient(clientId, target, properties, tlsConfig);
+ proxy = new GrpcClientProtocolClient(clientId, target, properties, tlsConfig, tlsConfig);
this.responseHandlerCreation = responseHandlerCreation;
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index a579550..5aeaafb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -50,14 +50,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
private final ClientId clientId;
private final int maxMessageSize;
- private final GrpcTlsConfig tlsConfig;
- public GrpcClientRpc(ClientId clientId, RaftProperties properties, GrpcTlsConfig tlsConfig) {
+ public GrpcClientRpc(ClientId clientId, RaftProperties properties,
+ GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
super(new PeerProxyMap<>(clientId.toString(),
- p -> new GrpcClientProtocolClient(clientId, p, properties, tlsConfig)));
+ p -> new GrpcClientProtocolClient(clientId, p, properties, adminTlsConfig, clientTlsConfig)));
this.clientId = clientId;
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
- this.tlsConfig = tlsConfig;
}
@Override
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index f570672..a618678 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Supplier;
import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
@@ -56,6 +58,9 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
public static final class Builder {
private RaftServer server;
private GrpcTlsConfig tlsConfig;
+ private GrpcTlsConfig adminTlsConfig;
+ private GrpcTlsConfig clientTlsConfig;
+ private GrpcTlsConfig serverTlsConfig;
private Builder() {}
@@ -65,7 +70,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
}
public GrpcService build() {
- return new GrpcService(server, getTlsConfig());
+ return new GrpcService(server, adminTlsConfig, clientTlsConfig, serverTlsConfig);
}
public Builder setTlsConfig(GrpcTlsConfig tlsConfig) {
@@ -73,6 +78,21 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
return this;
}
+ public Builder setAdminTlsConfig(GrpcTlsConfig config) {
+ this.adminTlsConfig = config;
+ return this;
+ }
+
+ public Builder setClientTlsConfig(GrpcTlsConfig config) {
+ this.clientTlsConfig = config;
+ return this;
+ }
+
+ public Builder setServerTlsConfig(GrpcTlsConfig config) {
+ this.serverTlsConfig = config;
+ return this;
+ }
+
public GrpcTlsConfig getTlsConfig() {
return tlsConfig;
}
@@ -82,8 +102,10 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
return new Builder();
}
- private final Server server;
+ private final List<Server> servers = new ArrayList<>(3);
private final Supplier<InetSocketAddress> addressSupplier;
+ private final Supplier<InetSocketAddress> clientServerAddressSupplier;
+ private final Supplier<InetSocketAddress> adminServerAddressSupplier;
private final GrpcClientProtocolService clientProtocolService;
@@ -93,23 +115,28 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
return serverInterceptor;
}
- private GrpcService(RaftServer server, GrpcTlsConfig tlsConfig) {
+ private GrpcService(RaftServer server,
+ GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
this(server, server::getId,
- GrpcConfigKeys.Server.port(server.getProperties()),
+ GrpcConfigKeys.Admin.port(server.getProperties()), adminTlsConfig,
+ GrpcConfigKeys.Client.port(server.getProperties()), clientTlsConfig,
+ GrpcConfigKeys.Server.port(server.getProperties()), serverTlsConfig,
GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
- RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
- tlsConfig);
+ RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
}
@SuppressWarnings("checkstyle:ParameterNumber") // private constructor
- private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
+ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
+ int adminPort, GrpcTlsConfig adminTlsConfig,
+ int clientPort, GrpcTlsConfig clientTlsConfig,
+ int serverPort, GrpcTlsConfig serverTlsConfig,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
- SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
+ SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration) {
super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
- requestTimeoutDuration, tlsConfig)));
+ requestTimeoutDuration, serverTlsConfig)));
if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
throw new IllegalArgumentException("Illegal configuration: "
+ RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize
@@ -120,20 +147,70 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
this.serverInterceptor = new MetricServerInterceptor(
idSupplier,
- JavaUtils.getClassSimpleName(getClass()) + "_" + port
+ JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort
);
+ final boolean separateAdminServer = adminPort != serverPort && adminPort > 0;
+ final boolean separateClientServer = clientPort != serverPort && clientPort > 0;
+
+ final NettyServerBuilder serverBuilder =
+ startBuildingNettyServer(serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow);
+ serverBuilder.addService(ServerInterceptors.intercept(
+ new GrpcServerProtocolService(idSupplier, raftServer), serverInterceptor));
+ if (!separateAdminServer) {
+ addAdminService(raftServer, serverBuilder);
+ }
+ if (!separateClientServer) {
+ addClientService(serverBuilder);
+ }
+
+ final Server server = serverBuilder.build();
+ servers.add(server);
+ addressSupplier = newAddressSupplier(serverPort, server);
+
+ if (separateAdminServer) {
+ final NettyServerBuilder builder =
+ startBuildingNettyServer(adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow);
+ addAdminService(raftServer, builder);
+ final Server adminServer = builder.build();
+ servers.add(adminServer);
+ adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer);
+ } else {
+ adminServerAddressSupplier = addressSupplier;
+ }
+
+ if (separateClientServer) {
+ final NettyServerBuilder builder =
+ startBuildingNettyServer(clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow);
+ addClientService(builder);
+ final Server clientServer = builder.build();
+ servers.add(clientServer);
+ clientServerAddressSupplier = newAddressSupplier(clientPort, clientServer);
+ } else {
+ clientServerAddressSupplier = addressSupplier;
+ }
+ }
+
+ private MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, Server server) {
+ return JavaUtils.memoize(() -> new InetSocketAddress(port != 0 ? port : server.getPort()));
+ }
+
+ private void addClientService(NettyServerBuilder builder) {
+ builder.addService(ServerInterceptors.intercept(clientProtocolService, serverInterceptor));
+ }
+
+ private void addAdminService(RaftServer raftServer, NettyServerBuilder nettyServerBuilder) {
+ nettyServerBuilder.addService(ServerInterceptors.intercept(
+ new GrpcAdminProtocolService(raftServer),
+ serverInterceptor));
+ }
+
+ private static NettyServerBuilder startBuildingNettyServer(int port, GrpcTlsConfig tlsConfig,
+ SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) {
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.withChildOption(ChannelOption.SO_REUSEADDR, true)
.maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
- .flowControlWindow(flowControlWindow.getSizeInt())
- .addService(ServerInterceptors.intercept(
- new GrpcServerProtocolService(idSupplier, raftServer),
- serverInterceptor))
- .addService(ServerInterceptors.intercept(clientProtocolService, serverInterceptor))
- .addService(ServerInterceptors.intercept(
- new GrpcAdminProtocolService(raftServer),
- serverInterceptor));
+ .flowControlWindow(flowControlWindow.getSizeInt());
if (tlsConfig != null) {
SslContextBuilder sslContextBuilder =
@@ -157,8 +234,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig, ex);
}
}
- server = nettyServerBuilder.build();
- addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
+ return nettyServerBuilder;
}
@Override
@@ -168,28 +244,32 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
@Override
public void startImpl() {
- try {
- server.start();
- } catch (IOException e) {
- ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
+ for (Server server : servers) {
+ try {
+ server.start();
+ } catch (IOException e) {
+ ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
+ }
+ LOG.info("{}: {} started, listening on {}",
+ getId(), JavaUtils.getClassSimpleName(getClass()), server.getPort());
}
- LOG.info("{}: {} started, listening on {}",
- getId(), JavaUtils.getClassSimpleName(getClass()), getInetSocketAddress());
}
@Override
public void closeImpl() throws IOException {
- final String name = getId() + ": shutdown server with port " + server.getPort();
- LOG.info("{} now", name);
- final Server s = server.shutdownNow();
- super.closeImpl();
- try {
- s.awaitTermination();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw IOUtils.toInterruptedIOException(name + " failed", e);
+ for (Server server : servers) {
+ final String name = getId() + ": shutdown server with port " + server.getPort();
+ LOG.info("{} now", name);
+ final Server s = server.shutdownNow();
+ super.closeImpl();
+ try {
+ s.awaitTermination();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException(name + " failed", e);
+ }
+ LOG.info("{} successfully", name);
}
- LOG.info("{} successfully", name);
}
@Override
@@ -203,6 +283,16 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
}
@Override
+ public InetSocketAddress getClientServerAddress() {
+ return clientServerAddressSupplier.get();
+ }
+
+ @Override
+ public InetSocketAddress getAdminServerAddress() {
+ return adminServerAddressSupplier.get();
+ }
+
+ @Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
throw new UnsupportedOperationException(
"Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
@@ -231,4 +321,5 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
return getProxies().getProxy(target).startLeaderElection(request);
}
+
}
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 4864183..1528349 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -23,10 +23,14 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.util.NetUtils;
+
+import java.util.Optional;
/**
* A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and data stream disabled.
@@ -58,6 +62,10 @@ public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
@Override
protected Parameters setPropertiesAndInitParameters(RaftPeerId id, RaftGroup group, RaftProperties properties) {
GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
+ Optional.ofNullable(getAddress(id, group, RaftPeer::getClientAddress)).ifPresent(address ->
+ GrpcConfigKeys.Client.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
+ Optional.ofNullable(getAddress(id, group, RaftPeer::getAdminAddress)).ifPresent(address ->
+ GrpcConfigKeys.Admin.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
return null;
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 0ff5d66..837b2f5 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -26,6 +26,8 @@ message RaftPeerProto {
string address = 2; // e.g. address of the RPC server
uint32 priority = 3; // priority of the peer
string dataStreamAddress = 4; // address of the data stream server
+ string clientAddress = 5; // address of the client RPC server
+ string adminAddress = 6; // address of the admin RPC server
}
message RaftPeerIdProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index df434d3..5c8f99c 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -35,9 +35,18 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, RaftPeer
/** Start the RPC service. */
void start() throws IOException;
- /** @return the address where this RPC server is listening to. */
+ /** @return the address where this RPC server is listening */
InetSocketAddress getInetSocketAddress();
+ /** @return the address where this RPC server is listening for client requests */
+ default InetSocketAddress getClientServerAddress() {
+ return getInetSocketAddress();
+ }
+ /** @return the address where this RPC server is listening for admin requests */
+ default InetSocketAddress getAdminServerAddress() {
+ return getInetSocketAddress();
+ }
+
/** Handle the given exception. For example, try reconnecting. */
void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index e6408f8..f03be97 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -296,6 +296,8 @@ class RaftServerProxy implements RaftServer {
.setId(getId())
.setAddress(getServerRpc().getInetSocketAddress())
.setDataStreamAddress(getDataStreamServerRpc().getInetSocketAddress())
+ .setClientAddress(getServerRpc().getClientServerAddress())
+ .setAdminAddress(getServerRpc().getAdminServerAddress())
.build();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index ed4c1a0..2d2c219 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -62,6 +62,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -73,6 +74,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -188,9 +190,12 @@ public abstract class MiniRaftCluster implements Closeable {
}
protected int getPort(RaftPeerId id, RaftGroup g) {
+ return getPort(getAddress(id, g, RaftPeer::getAddress));
+ }
+
+ protected String getAddress(RaftPeerId id, RaftGroup g, Function<RaftPeer, String> getAddress) {
final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
- final String address = p == null? null : p.getAddress();
- return getPort(address);
+ return p == null? null : getAddress.apply(p);
}
protected int getDataStreamPort(RaftPeerId id, RaftGroup g) {
@@ -219,12 +224,15 @@ public abstract class MiniRaftCluster implements Closeable {
}
public static RaftGroup initRaftGroup(Collection<String> ids) {
+ Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * ids.size()).iterator();
final RaftPeer[] peers = ids.stream()
.map(RaftPeerId::valueOf)
.map(id -> RaftPeer.newBuilder().setId(id)
- .setAddress(NetUtils.createLocalServerAddress())
- .setDataStreamAddress(NetUtils.createLocalServerAddress())
- .build())
+ .setAddress(addresses.next())
+ .setAdminAddress(addresses.next())
+ .setClientAddress(addresses.next())
+ .setDataStreamAddress(addresses.next())
+ .build())
.toArray(RaftPeer[]::new);
return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 53231b2..6e75224 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -106,7 +106,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
for (int i = 0; i < peers.size(); i++) {
RaftPeer peer = peers.get(i);
peersWithPriority.add(
- RaftPeer.newBuilder().setId(peer.getId()).setAddress(peer.getAddress()).setPriority(i).build());
+ RaftPeer.newBuilder(peer).setPriority(i).build());
}
final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peersWithPriority);