You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/09 04:05:16 UTC
[incubator-ratis] branch master updated: RATIS-1138. Add
dataStreamAddress to RaftPeer. (#262)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 311229c RATIS-1138. Add dataStreamAddress to RaftPeer. (#262)
311229c is described below
commit 311229c5428cfa62d6db2d3dc56973a2b8833678
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 9 12:05:06 2020 +0800
RATIS-1138. Add dataStreamAddress to RaftPeer. (#262)
---
.../org/apache/ratis/client/DataStreamClient.java | 32 +++++++--
.../apache/ratis/client/impl/ClientImplUtils.java | 18 +++--
.../ratis/client/impl/DataStreamClientImpl.java | 25 +++----
.../apache/ratis/client/impl/RaftClientImpl.java | 50 +++++++++-----
.../java/org/apache/ratis/protocol/RaftGroup.java | 4 +-
.../java/org/apache/ratis/protocol/RaftPeer.java | 77 ++++++++++++++++++++--
.../main/java/org/apache/ratis/util/NetUtils.java | 3 +
.../java/org/apache/ratis/util/ProtoUtils.java | 7 +-
.../ratis/netty/client/NettyClientStreamRpc.java | 2 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 2 +-
ratis-proto/src/main/proto/Raft.proto | 3 +-
.../ratis/datastream/DataStreamBaseTest.java | 29 ++------
.../ratis/datastream/TestDataStreamNetty.java | 27 +++++++-
13 files changed, 193 insertions(+), 86 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index 0bc0afc..b00f182 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -17,9 +17,11 @@
*/
package org.apache.ratis.client;
-import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.impl.ClientImplUtils;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@@ -27,6 +29,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.Objects;
+import java.util.Optional;
/**
* A user interface extending {@link DataStreamRpcApi}.
@@ -43,7 +47,8 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
/** To build {@link DataStreamClient} objects */
class Builder {
- private RaftPeer raftServer;
+ private RaftPeer dataStreamServer;
+ private DataStreamClientRpc dataStreamClientRpc;
private RaftProperties properties;
private Parameters parameters;
private RaftGroupId raftGroupId;
@@ -51,8 +56,18 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
private Builder() {}
- public DataStreamClientImpl build(){
- return new DataStreamClientImpl(clientId, raftGroupId, raftServer, properties, parameters);
+ public DataStreamClient build() {
+ Objects.requireNonNull(dataStreamServer, "The 'dataStreamServer' field is not initialized.");
+ if (properties != null) {
+ if (dataStreamClientRpc == null) {
+ final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
+ dataStreamClientRpc = DataStreamClientFactory.newInstance(type, parameters)
+ .newDataStreamClientRpc(dataStreamServer, properties);
+ }
+ }
+ return ClientImplUtils.newDataStreamClient(
+ Optional.ofNullable(clientId).orElseGet(ClientId::randomId),
+ raftGroupId, dataStreamServer, dataStreamClientRpc, properties);
}
public Builder setClientId(ClientId clientId) {
@@ -65,8 +80,13 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
return this;
}
- public Builder setRaftServer(RaftPeer peer) {
- this.raftServer = peer;
+ public Builder setDataStreamServer(RaftPeer dataStreamServer) {
+ this.dataStreamServer = dataStreamServer;
+ return this;
+ }
+
+ public Builder setDataStreamClientRpc(DataStreamClientRpc dataStreamClientRpc) {
+ this.dataStreamClientRpc = dataStreamClientRpc;
return this;
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 2222ece..eeddb22 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,25 +17,29 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.DataStreamClient;
+import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
/** Client utilities for internal use. */
-public final class ClientImplUtils {
- private ClientImplUtils() {
-
- }
-
- public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
+public interface ClientImplUtils {
+ static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
RaftPeerId leaderId, RaftPeer primaryDataStreamServer, RaftClientRpc clientRpc, RaftProperties properties,
RetryPolicy retryPolicy) {
return new RaftClientImpl(clientId, group, leaderId, primaryDataStreamServer, clientRpc, properties,
retryPolicy);
}
+
+ static DataStreamClient newDataStreamClient(ClientId clientId, RaftGroupId groupId, RaftPeer primaryDataStreamServer,
+ DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+ return new DataStreamClientImpl(clientId, groupId, primaryDataStreamServer, dataStreamClientRpc, properties);
+ }
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 6c07374..2798de6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -17,25 +17,20 @@
*/
package org.apache.ratis.client.impl;
-import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DataStreamClient;
-import org.apache.ratis.client.DataStreamClientFactory;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
-import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.RaftPeer;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
/**
@@ -46,20 +41,16 @@ public class DataStreamClientImpl implements DataStreamClient {
private final ClientId clientId;
private final RaftGroupId groupId;
- private final RaftPeer raftServer;
+ private final RaftPeer dataStreamServer;
private final DataStreamClientRpc dataStreamClientRpc;
private final OrderedStreamAsync orderedStreamAsync;
- public DataStreamClientImpl(
- ClientId clientId, RaftGroupId groupId, RaftPeer server, RaftProperties properties, Parameters parameters) {
+ DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer,
+ DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.clientId = clientId;
this.groupId = groupId;
- this.raftServer = Objects.requireNonNull(server, "server == null");
-
- final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
- this.dataStreamClientRpc = DataStreamClientFactory.newInstance(type, parameters)
- .newDataStreamClientRpc(raftServer, properties);
-
+ this.dataStreamServer = dataStreamServer;
+ this.dataStreamClientRpc = dataStreamClientRpc;
this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
}
@@ -124,7 +115,7 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public DataStreamOutputRpc stream() {
RaftClientRequest request = new RaftClientRequest(
- clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
+ clientId, dataStreamServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
return new DataStreamOutputImpl(request);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 62baac5..d28ce40 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.api.DataStreamApi;
@@ -42,22 +43,25 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -107,15 +111,28 @@ public final class RaftClientImpl implements RaftClient {
}
}
+ static class RaftPeerList implements Iterable<RaftPeer> {
+ private final AtomicReference<List<RaftPeer>> list = new AtomicReference<>();
+
+ @Override
+ public Iterator<RaftPeer> iterator() {
+ return list.get().iterator();
+ }
+
+ void set(Collection<RaftPeer> newPeers) {
+ list.set(Collections.unmodifiableList(new ArrayList<>(newPeers)));
+ }
+ }
+
private final ClientId clientId;
private final RaftClientRpc clientRpc;
- private final Collection<RaftPeer> peers;
+ private final RaftPeerList peers = new RaftPeerList();
private final RaftGroupId groupId;
private final RetryPolicy retryPolicy;
private volatile RaftPeerId leaderId;
- private final TimeoutScheduler scheduler;
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final Supplier<OrderedAsync> orderedAsync;
private final Supplier<MessageStreamApi> streamApi;
@@ -126,22 +143,24 @@ public final class RaftClientImpl implements RaftClient {
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
this.clientId = clientId;
- this.clientRpc = clientRpc;
- this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
+ this.peers.set(group.getPeers());
this.groupId = group.getGroupId();
this.leaderId = leaderId != null? leaderId : getHighestPriorityPeerId();
- Preconditions.assertTrue(retryPolicy != null, "retry policy can't be null");
- this.retryPolicy = retryPolicy;
+ this.retryPolicy = Objects.requireNonNull(retryPolicy, "retry policy can't be null");
- scheduler = TimeoutScheduler.getInstance();
- clientRpc.addRaftPeers(peers);
+ clientRpc.addRaftPeers(group.getPeers());
+ this.clientRpc = clientRpc;
this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, properties));
this.streamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
- this.dataStreamApi = JavaUtils.memoize(
- () ->new DataStreamClientImpl(clientId, groupId, primaryDataStreamServer, properties, null));
+ this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder()
+ .setClientId(clientId)
+ .setRaftGroupId(groupId)
+ .setDataStreamServer(primaryDataStreamServer)
+ .setProperties(properties)
+ .build());
}
public RaftPeerId getLeaderId() {
@@ -153,10 +172,6 @@ public final class RaftClientImpl implements RaftClient {
}
private RaftPeerId getHighestPriorityPeerId() {
- if (peers == null) {
- return null;
- }
-
int maxPriority = Integer.MIN_VALUE;
RaftPeerId highestPriorityPeerId = null;
for (RaftPeer peer : peers) {
@@ -284,8 +299,7 @@ public final class RaftClientImpl implements RaftClient {
private void refreshPeers(Collection<RaftPeer> newPeers) {
if (newPeers != null && newPeers.size() > 0) {
- peers.clear();
- peers.addAll(newPeers);
+ peers.set(newPeers);
// also refresh the rpc proxies for these peers
clientRpc.addRaftPeers(newPeers);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index 9aa1702..ef052d1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.*;
/**
* Description of a raft group, which has a unique {@link RaftGroupId} and a collection of {@link RaftPeer}.
*
- * This is a value-based class.
+ * The objects of this class are immutable.
*/
public final class RaftGroup {
private static final RaftGroup EMPTY_GROUP = new RaftGroup();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index 58e5a65..6e452f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -25,10 +25,11 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
+import java.util.Optional;
import java.util.function.Supplier;
/**
- * A {@link RaftPeer} is a server in a Raft cluster.
+ * A {@link RaftPeer} contains the information of a server.
*
* The objects of this class are immutable.
*/
@@ -50,10 +51,60 @@ public class RaftPeer {
}
}
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private RaftPeerId id;
+ private String address;
+ private String dataStreamAddress;
+ private int priority;
+
+ public Builder setId(RaftPeerId id) {
+ this.id = id;
+ return this;
+ }
+
+ public Builder setAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ public Builder setAddress(InetSocketAddress address) {
+ return setAddress(NetUtils.address2String(address));
+ }
+
+ public Builder setDataStreamAddress(String dataStreamAddress) {
+ this.dataStreamAddress = dataStreamAddress;
+ return this;
+ }
+
+ public Builder setDataStreamAddress(InetSocketAddress dataStreamAddress) {
+ return setDataStreamAddress(NetUtils.address2String(dataStreamAddress));
+ }
+
+ public Builder setPriority(int priority) {
+ if (priority < 0) {
+ throw new IllegalArgumentException("priority = " + priority + " < 0");
+ }
+ this.priority = priority;
+ return this;
+ }
+
+ public RaftPeer build() {
+ return new RaftPeer(
+ Objects.requireNonNull(id, "The 'id' field is not initialized."),
+ address, dataStreamAddress, priority);
+ }
+ }
+
/** The id of the peer. */
private final RaftPeerId id;
- /** The address of the peer. */
+ /** The RPC address of the peer. */
private final String address;
+ /** The DataStream address of the peer. */
+ private final String dataStreamAddress;
/** The priority of the peer. */
private final int priority;
@@ -76,8 +127,13 @@ public class RaftPeer {
/** Construct a peer with the given id, address, priority. */
public RaftPeer(RaftPeerId id, String address, int priority) {
+ this(id, address, null, priority);
+ }
+
+ private RaftPeer(RaftPeerId id, String address, String dataStreamAddress, int priority) {
this.id = Objects.requireNonNull(id, "id == null");
this.address = address;
+ this.dataStreamAddress = dataStreamAddress;
this.priority = priority;
this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto);
}
@@ -85,9 +141,8 @@ public class RaftPeer {
private RaftPeerProto buildRaftPeerProto() {
final RaftPeerProto.Builder builder = RaftPeerProto.newBuilder()
.setId(getId().toByteString());
- if (getAddress() != null) {
- builder.setAddress(getAddress());
- }
+ Optional.ofNullable(getAddress()).ifPresent(builder::setAddress);
+ Optional.ofNullable(getDataStreamAddress()).ifPresent(builder::setDataStreamAddress);
builder.setPriority(priority);
return builder.build();
}
@@ -97,11 +152,16 @@ public class RaftPeer {
return id;
}
- /** @return The address of the peer. */
+ /** @return The RPC address of the peer. */
public String getAddress() {
return address;
}
+ /** @return The data stream address of the peer. */
+ public String getDataStreamAddress() {
+ return dataStreamAddress;
+ }
+
/** @return The priority of the peer. */
public int getPriority() {
return priority;
@@ -113,7 +173,10 @@ public class RaftPeer {
@Override
public String toString() {
- return id + ":" + address + ":" + priority;
+ final String rpc = address != null? "|rpc:" + address: "";
+ final String data = dataStreamAddress != null? "|dataStream:" + dataStreamAddress: "";
+ final String p = priority > 0? "|p" + priority: "";
+ return id + rpc + data + p;
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
index 49b4186..6fe9802 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
@@ -123,6 +123,9 @@ public interface NetUtils {
}
static String address2String(InetSocketAddress address) {
+ if (address == null) {
+ return null;
+ }
final StringBuilder b = new StringBuilder(address.getHostName());
if (address.getAddress() instanceof Inet6Address) {
b.insert(0, '[').append(']');
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 97a8676..78b1d61 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -72,7 +72,12 @@ public interface ProtoUtils {
}
static RaftPeer toRaftPeer(RaftPeerProto p) {
- return new RaftPeer(RaftPeerId.valueOf(p.getId()), p.getAddress(), p.getPriority());
+ return RaftPeer.newBuilder()
+ .setId(RaftPeerId.valueOf(p.getId()))
+ .setAddress(p.getAddress())
+ .setDataStreamAddress(p.getDataStreamAddress())
+ .setPriority(p.getPriority())
+ .build();
}
static List<RaftPeer> toRaftPeers(List<RaftPeerProto> protos) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 3f4b791..b476bd3 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -63,7 +63,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
.channel(NioSocketChannel.class)
.handler(getInitializer())
.option(ChannelOption.SO_KEEPALIVE, true)
- .connect(NetUtils.createSocketAddr(server.getAddress()));
+ .connect(NetUtils.createSocketAddr(server.getDataStreamAddress()));
this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 6d3143a..e7fec64 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -237,7 +237,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
return DataStreamClient.newBuilder()
.setClientId(ClientId.randomId())
- .setRaftServer(peer)
+ .setDataStreamServer(peer)
.setProperties(properties)
.build();
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 18e88f2..f3517d6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -23,8 +23,9 @@ package ratis.common;
message RaftPeerProto {
bytes id = 1; // id of the peer
- string address = 2; // e.g. IP address, hostname etc.
+ string address = 2; // e.g. address of the RPC server
uint32 priority = 3; // priority of the peer
+ string dataStreamAddress = 4; // address of the data stream server
}
message RaftGroupIdProto {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 4d50d7f..680e5de 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -24,7 +24,6 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.GroupInfoReply;
@@ -53,6 +52,7 @@ import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
@@ -320,7 +320,7 @@ abstract class DataStreamBaseTest extends BaseTest {
protected void setup(int numServers){
final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
.map(RaftPeerId::valueOf)
- .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
+ .map(id -> RaftPeer.newBuilder().setId(id).setDataStreamAddress(NetUtils.createLocalServerAddress()).build())
.collect(Collectors.toList());
List<RaftServer> raftServers = new ArrayList<>();
@@ -328,17 +328,8 @@ abstract class DataStreamBaseTest extends BaseTest {
setup(peers, raftServers);
}
- protected void setup(List<RaftServer> raftServers) {
- final List<RaftPeer> peers = new ArrayList<>();
- raftServers.forEach(raftServer ->
- peers.add(new RaftPeer(raftServer.getId(),
- NetUtils.createSocketAddrForHost("http://localhost",
- NettyConfigKeys.DataStream.port(raftServer.getProperties())))));
- setup(peers, raftServers);
- }
-
- private void setup(List<RaftPeer> peers, List<RaftServer> raftServers){
+ void setup(List<RaftPeer> peers, List<RaftServer> raftServers) {
raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
servers = new ArrayList<>(peers.size());
// start stream servers on raft peers.
@@ -379,16 +370,6 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
- protected void runTestCloseStream(List<RaftServer> raftServers, int bufferSize, int bufferNum,
- RaftClientReply expectedClientReply) throws Exception {
- try {
- setup(raftServers);
- runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
- } finally {
- shutdown();
- }
- }
-
private void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) throws Exception {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
final List<RaftClient> clients = new ArrayList<>();
@@ -404,13 +385,13 @@ abstract class DataStreamBaseTest extends BaseTest {
Assert.assertEquals(numClients*numStreams, futures.size());
futures.forEach(CompletableFuture::join);
} finally {
- for (int j = 0; j < numClients; j++) {
+ for (int j = 0; j < clients.size(); j++) {
clients.get(j).close();
}
}
}
- private void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
+ void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream()) {
final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 4b13599..8b65ba5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -35,14 +35,26 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestDataStreamNetty extends DataStreamBaseTest {
+ static RaftPeer newRaftPeer(RaftServer server) {
+ final InetSocketAddress rpc = NetUtils.createLocalServerAddress();
+ final int dataStreamPort = NettyConfigKeys.DataStream.port(server.getProperties());
+ return RaftPeer.newBuilder()
+ .setId(server.getId())
+ .setAddress(rpc)
+ .setDataStreamAddress(NetUtils.createSocketAddrForHost(rpc.getHostName(), dataStreamPort))
+ .build();
+ }
+
@Before
public void setup() {
properties = new RaftProperties();
@@ -52,7 +64,7 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
@Override
protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
final RaftProperties p = new RaftProperties(properties);
- NettyConfigKeys.DataStream.setPort(p, NetUtils.createSocketAddr(peer.getAddress()).getPort());
+ NettyConfigKeys.DataStream.setPort(p, NetUtils.createSocketAddr(peer.getDataStreamAddress()).getPort());
return super.newRaftServer(peer, p);
}
@@ -106,6 +118,19 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
runTestCloseStream(raftServers, 1_000_000, 10, expectedClientReply);
}
+ void runTestCloseStream(List<RaftServer> raftServers, int bufferSize, int bufferNum,
+ RaftClientReply expectedClientReply) throws Exception {
+ try {
+ final List<RaftPeer> peers = raftServers.stream()
+ .map(TestDataStreamNetty::newRaftPeer)
+ .collect(Collectors.toList());
+ setup(peers, raftServers);
+ runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
+ } finally {
+ shutdown();
+ }
+ }
+
@Test
public void testCloseStreamPrimaryIsLeader() throws Exception {
// primary is 0, leader is 0