You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/09 04:05:16 UTC

[incubator-ratis] branch master updated: RATIS-1138. Add dataStreamAddress to RaftPeer. (#262)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 311229c  RATIS-1138. Add dataStreamAddress to RaftPeer. (#262)
311229c is described below

commit 311229c5428cfa62d6db2d3dc56973a2b8833678
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 9 12:05:06 2020 +0800

    RATIS-1138. Add dataStreamAddress to RaftPeer. (#262)
---
 .../org/apache/ratis/client/DataStreamClient.java  | 32 +++++++--
 .../apache/ratis/client/impl/ClientImplUtils.java  | 18 +++--
 .../ratis/client/impl/DataStreamClientImpl.java    | 25 +++----
 .../apache/ratis/client/impl/RaftClientImpl.java   | 50 +++++++++-----
 .../java/org/apache/ratis/protocol/RaftGroup.java  |  4 +-
 .../java/org/apache/ratis/protocol/RaftPeer.java   | 77 ++++++++++++++++++++--
 .../main/java/org/apache/ratis/util/NetUtils.java  |  3 +
 .../java/org/apache/ratis/util/ProtoUtils.java     |  7 +-
 .../ratis/netty/client/NettyClientStreamRpc.java   |  2 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   |  2 +-
 ratis-proto/src/main/proto/Raft.proto              |  3 +-
 .../ratis/datastream/DataStreamBaseTest.java       | 29 ++------
 .../ratis/datastream/TestDataStreamNetty.java      | 27 +++++++-
 13 files changed, 193 insertions(+), 86 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index 0bc0afc..b00f182 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -17,9 +17,11 @@
  */
 package org.apache.ratis.client;
 
-import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.impl.ClientImplUtils;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
@@ -27,6 +29,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.Objects;
+import java.util.Optional;
 
 /**
  * A user interface extending {@link DataStreamRpcApi}.
@@ -43,7 +47,8 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
 
   /** To build {@link DataStreamClient} objects */
   class Builder {
-    private RaftPeer raftServer;
+    private RaftPeer dataStreamServer;
+    private DataStreamClientRpc dataStreamClientRpc;
     private RaftProperties properties;
     private Parameters parameters;
     private RaftGroupId raftGroupId;
@@ -51,8 +56,18 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
 
     private Builder() {}
 
-    public DataStreamClientImpl build(){
-      return new DataStreamClientImpl(clientId, raftGroupId, raftServer, properties, parameters);
+    public DataStreamClient build() {
+      Objects.requireNonNull(dataStreamServer, "The 'dataStreamServer' field is not initialized.");
+      if (properties != null) {
+        if (dataStreamClientRpc == null) {
+          final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
+          dataStreamClientRpc = DataStreamClientFactory.newInstance(type, parameters)
+              .newDataStreamClientRpc(dataStreamServer, properties);
+        }
+      }
+      return ClientImplUtils.newDataStreamClient(
+          Optional.ofNullable(clientId).orElseGet(ClientId::randomId),
+          raftGroupId, dataStreamServer, dataStreamClientRpc, properties);
     }
 
     public Builder setClientId(ClientId clientId) {
@@ -65,8 +80,13 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
       return this;
     }
 
-    public Builder setRaftServer(RaftPeer peer) {
-      this.raftServer = peer;
+    public Builder setDataStreamServer(RaftPeer dataStreamServer) {
+      this.dataStreamServer = dataStreamServer;
+      return this;
+    }
+
+    public Builder setDataStreamClientRpc(DataStreamClientRpc dataStreamClientRpc) {
+      this.dataStreamClientRpc = dataStreamClientRpc;
       return this;
     }
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 2222ece..eeddb22 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,25 +17,29 @@
  */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.DataStreamClient;
+import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 
 /** Client utilities for internal use. */
-public final class ClientImplUtils {
-  private ClientImplUtils() {
-
-  }
-
-  public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
+public interface ClientImplUtils {
+  static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
       RaftPeerId leaderId, RaftPeer primaryDataStreamServer, RaftClientRpc clientRpc, RaftProperties properties,
       RetryPolicy retryPolicy) {
     return new RaftClientImpl(clientId, group, leaderId, primaryDataStreamServer, clientRpc, properties,
         retryPolicy);
   }
+
+  static DataStreamClient newDataStreamClient(ClientId clientId, RaftGroupId groupId, RaftPeer primaryDataStreamServer,
+      DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+    return new DataStreamClientImpl(clientId, groupId, primaryDataStreamServer, dataStreamClientRpc, properties);
+  }
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 6c07374..2798de6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -17,25 +17,20 @@
 */
 package org.apache.ratis.client.impl;
 
-import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.DataStreamClient;
-import org.apache.ratis.client.DataStreamClientFactory;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.DataStreamOutputRpc;
-import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.RaftPeer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -46,20 +41,16 @@ public class DataStreamClientImpl implements DataStreamClient {
   private final ClientId clientId;
   private final RaftGroupId groupId;
 
-  private final RaftPeer raftServer;
+  private final RaftPeer dataStreamServer;
   private final DataStreamClientRpc dataStreamClientRpc;
   private final OrderedStreamAsync orderedStreamAsync;
 
-  public DataStreamClientImpl(
-      ClientId clientId, RaftGroupId groupId, RaftPeer server, RaftProperties properties, Parameters parameters) {
+  DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer,
+      DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
     this.clientId = clientId;
     this.groupId = groupId;
-    this.raftServer = Objects.requireNonNull(server, "server == null");
-
-    final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
-    this.dataStreamClientRpc = DataStreamClientFactory.newInstance(type, parameters)
-                               .newDataStreamClientRpc(raftServer, properties);
-
+    this.dataStreamServer = dataStreamServer;
+    this.dataStreamClientRpc = dataStreamClientRpc;
     this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
   }
 
@@ -124,7 +115,7 @@ public class DataStreamClientImpl implements DataStreamClient {
   @Override
   public DataStreamOutputRpc stream() {
     RaftClientRequest request = new RaftClientRequest(
-        clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
+        clientId, dataStreamServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
     return new DataStreamOutputImpl(request);
   }
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 62baac5..d28ce40 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.client.api.DataStreamApi;
@@ -42,22 +43,25 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutScheduler;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -107,15 +111,28 @@ public final class RaftClientImpl implements RaftClient {
     }
   }
 
+  static class RaftPeerList implements Iterable<RaftPeer> {
+    private final AtomicReference<List<RaftPeer>> list = new AtomicReference<>();
+
+    @Override
+    public Iterator<RaftPeer> iterator() {
+      return list.get().iterator();
+    }
+
+    void set(Collection<RaftPeer> newPeers) {
+      list.set(Collections.unmodifiableList(new ArrayList<>(newPeers)));
+    }
+  }
+
   private final ClientId clientId;
   private final RaftClientRpc clientRpc;
-  private final Collection<RaftPeer> peers;
+  private final RaftPeerList peers = new RaftPeerList();
   private final RaftGroupId groupId;
   private final RetryPolicy retryPolicy;
 
   private volatile RaftPeerId leaderId;
 
-  private final TimeoutScheduler scheduler;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   private final Supplier<OrderedAsync> orderedAsync;
   private final Supplier<MessageStreamApi> streamApi;
@@ -126,22 +143,24 @@ public final class RaftClientImpl implements RaftClient {
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
     this.clientId = clientId;
-    this.clientRpc = clientRpc;
-    this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
+    this.peers.set(group.getPeers());
     this.groupId = group.getGroupId();
     this.leaderId = leaderId != null? leaderId : getHighestPriorityPeerId();
-    Preconditions.assertTrue(retryPolicy != null, "retry policy can't be null");
-    this.retryPolicy = retryPolicy;
+    this.retryPolicy = Objects.requireNonNull(retryPolicy, "retry policy can't be null");
 
-    scheduler = TimeoutScheduler.getInstance();
-    clientRpc.addRaftPeers(peers);
+    clientRpc.addRaftPeers(group.getPeers());
+    this.clientRpc = clientRpc;
 
     this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, properties));
     this.streamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
     this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
     this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
-    this.dataStreamApi = JavaUtils.memoize(
-        () ->new DataStreamClientImpl(clientId, groupId, primaryDataStreamServer, properties, null));
+    this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder()
+        .setClientId(clientId)
+        .setRaftGroupId(groupId)
+        .setDataStreamServer(primaryDataStreamServer)
+        .setProperties(properties)
+        .build());
   }
 
   public RaftPeerId getLeaderId() {
@@ -153,10 +172,6 @@ public final class RaftClientImpl implements RaftClient {
   }
 
   private RaftPeerId getHighestPriorityPeerId() {
-    if (peers == null) {
-      return null;
-    }
-
     int maxPriority = Integer.MIN_VALUE;
     RaftPeerId highestPriorityPeerId = null;
     for (RaftPeer peer : peers) {
@@ -284,8 +299,7 @@ public final class RaftClientImpl implements RaftClient {
 
   private void refreshPeers(Collection<RaftPeer> newPeers) {
     if (newPeers != null && newPeers.size() > 0) {
-      peers.clear();
-      peers.addAll(newPeers);
+      peers.set(newPeers);
       // also refresh the rpc proxies for these peers
       clientRpc.addRaftPeers(newPeers);
     }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index 9aa1702..ef052d1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.*;
 /**
  * Description of a raft group, which has a unique {@link RaftGroupId} and a collection of {@link RaftPeer}.
  *
- * This is a value-based class.
+ * The objects of this class are immutable.
  */
 public final class RaftGroup {
   private static final RaftGroup EMPTY_GROUP = new RaftGroup();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index 58e5a65..6e452f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -25,10 +25,11 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
- * A {@link RaftPeer} is a server in a Raft cluster.
+ * A {@link RaftPeer} contains the information of a server.
  *
  * The objects of this class are immutable.
  */
@@ -50,10 +51,60 @@ public class RaftPeer {
     }
   }
 
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private RaftPeerId id;
+    private String address;
+    private String dataStreamAddress;
+    private int priority;
+
+    public Builder setId(RaftPeerId id) {
+      this.id = id;
+      return this;
+    }
+
+    public Builder setAddress(String address) {
+      this.address = address;
+      return this;
+    }
+
+    public Builder setAddress(InetSocketAddress address) {
+      return setAddress(NetUtils.address2String(address));
+    }
+
+    public Builder setDataStreamAddress(String dataStreamAddress) {
+      this.dataStreamAddress = dataStreamAddress;
+      return this;
+    }
+
+    public Builder setDataStreamAddress(InetSocketAddress dataStreamAddress) {
+      return setDataStreamAddress(NetUtils.address2String(dataStreamAddress));
+    }
+
+    public Builder setPriority(int priority) {
+      if (priority < 0) {
+        throw new IllegalArgumentException("priority = " + priority + " < 0");
+      }
+      this.priority = priority;
+      return this;
+    }
+
+    public RaftPeer build() {
+      return new RaftPeer(
+          Objects.requireNonNull(id, "The 'id' field is not initialized."),
+          address, dataStreamAddress, priority);
+    }
+  }
+
   /** The id of the peer. */
   private final RaftPeerId id;
-  /** The address of the peer. */
+  /** The RPC address of the peer. */
   private final String address;
+  /** The DataStream address of the peer. */
+  private final String dataStreamAddress;
   /** The priority of the peer. */
   private final int priority;
 
@@ -76,8 +127,13 @@ public class RaftPeer {
 
   /** Construct a peer with the given id, address, priority. */
   public RaftPeer(RaftPeerId id, String address, int priority) {
+    this(id, address, null, priority);
+  }
+
+  private RaftPeer(RaftPeerId id, String address, String dataStreamAddress, int priority) {
     this.id = Objects.requireNonNull(id, "id == null");
     this.address = address;
+    this.dataStreamAddress = dataStreamAddress;
     this.priority = priority;
     this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto);
   }
@@ -85,9 +141,8 @@ public class RaftPeer {
   private RaftPeerProto buildRaftPeerProto() {
     final RaftPeerProto.Builder builder = RaftPeerProto.newBuilder()
         .setId(getId().toByteString());
-    if (getAddress() != null) {
-      builder.setAddress(getAddress());
-    }
+    Optional.ofNullable(getAddress()).ifPresent(builder::setAddress);
+    Optional.ofNullable(getDataStreamAddress()).ifPresent(builder::setDataStreamAddress);
     builder.setPriority(priority);
     return builder.build();
   }
@@ -97,11 +152,16 @@ public class RaftPeer {
     return id;
   }
 
-  /** @return The address of the peer. */
+  /** @return The RPC address of the peer. */
   public String getAddress() {
     return address;
   }
 
+  /** @return The data stream address of the peer. */
+  public String getDataStreamAddress() {
+    return dataStreamAddress;
+  }
+
   /** @return The priority of the peer. */
   public int getPriority() {
     return priority;
@@ -113,7 +173,10 @@ public class RaftPeer {
 
   @Override
   public String toString() {
-    return id + ":" + address + ":" + priority;
+    final String rpc = address != null? "|rpc:" + address: "";
+    final String data = dataStreamAddress != null? "|dataStream:" + dataStreamAddress: "";
+    final String p = priority > 0? "|p" +  priority: "";
+    return id + rpc + data + p;
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
index 49b4186..6fe9802 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
@@ -123,6 +123,9 @@ public interface NetUtils {
   }
 
   static String address2String(InetSocketAddress address) {
+    if (address == null) {
+      return null;
+    }
     final StringBuilder b = new StringBuilder(address.getHostName());
     if (address.getAddress() instanceof Inet6Address) {
       b.insert(0, '[').append(']');
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 97a8676..78b1d61 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -72,7 +72,12 @@ public interface ProtoUtils {
   }
 
   static RaftPeer toRaftPeer(RaftPeerProto p) {
-    return new RaftPeer(RaftPeerId.valueOf(p.getId()), p.getAddress(), p.getPriority());
+    return RaftPeer.newBuilder()
+        .setId(RaftPeerId.valueOf(p.getId()))
+        .setAddress(p.getAddress())
+        .setDataStreamAddress(p.getDataStreamAddress())
+        .setPriority(p.getPriority())
+        .build();
   }
 
   static List<RaftPeer> toRaftPeers(List<RaftPeerProto> protos) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 3f4b791..b476bd3 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -63,7 +63,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
         .channel(NioSocketChannel.class)
         .handler(getInitializer())
         .option(ChannelOption.SO_KEEPALIVE, true)
-        .connect(NetUtils.createSocketAddr(server.getAddress()));
+        .connect(NetUtils.createSocketAddr(server.getDataStreamAddress()));
     this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
   }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 6d3143a..e7fec64 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -237,7 +237,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
   static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
     return DataStreamClient.newBuilder()
         .setClientId(ClientId.randomId())
-        .setRaftServer(peer)
+        .setDataStreamServer(peer)
         .setProperties(properties)
         .build();
   }
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 18e88f2..f3517d6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -23,8 +23,9 @@ package ratis.common;
 
 message RaftPeerProto {
   bytes id = 1;      // id of the peer
-  string address = 2; // e.g. IP address, hostname etc.
+  string address = 2; // e.g. address of the RPC server
   uint32 priority = 3; // priority of the peer
+  string dataStreamAddress = 4; // address of the data stream server
 }
 
 message RaftGroupIdProto {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 4d50d7f..680e5de 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -24,7 +24,6 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.GroupInfoReply;
@@ -53,6 +52,7 @@ import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -320,7 +320,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   protected void setup(int numServers){
     final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
         .map(RaftPeerId::valueOf)
-        .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
+        .map(id -> RaftPeer.newBuilder().setId(id).setDataStreamAddress(NetUtils.createLocalServerAddress()).build())
         .collect(Collectors.toList());
 
     List<RaftServer> raftServers = new ArrayList<>();
@@ -328,17 +328,8 @@ abstract class DataStreamBaseTest extends BaseTest {
     setup(peers, raftServers);
   }
 
-  protected void setup(List<RaftServer> raftServers) {
-    final List<RaftPeer> peers = new ArrayList<>();
-    raftServers.forEach(raftServer ->
-        peers.add(new RaftPeer(raftServer.getId(),
-            NetUtils.createSocketAddrForHost("http://localhost",
-                NettyConfigKeys.DataStream.port(raftServer.getProperties())))));
 
-    setup(peers, raftServers);
-  }
-
-  private void setup(List<RaftPeer> peers, List<RaftServer> raftServers){
+  void setup(List<RaftPeer> peers, List<RaftServer> raftServers) {
     raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
     servers = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
@@ -379,16 +370,6 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
-  protected void runTestCloseStream(List<RaftServer> raftServers, int bufferSize, int bufferNum,
-      RaftClientReply expectedClientReply) throws Exception {
-    try {
-      setup(raftServers);
-      runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
-    } finally {
-      shutdown();
-    }
-  }
-
   private void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) throws Exception {
     final List<CompletableFuture<Void>> futures = new ArrayList<>();
     final List<RaftClient> clients = new ArrayList<>();
@@ -404,13 +385,13 @@ abstract class DataStreamBaseTest extends BaseTest {
       Assert.assertEquals(numClients*numStreams, futures.size());
       futures.forEach(CompletableFuture::join);
     } finally {
-      for (int j = 0; j < numClients; j++) {
+      for (int j = 0; j < clients.size(); j++) {
         clients.get(j).close();
       }
     }
   }
 
-  private void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
+  void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream()) {
       final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 4b13599..8b65ba5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -35,14 +35,26 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestDataStreamNetty extends DataStreamBaseTest {
+  static RaftPeer newRaftPeer(RaftServer server) {
+    final InetSocketAddress rpc = NetUtils.createLocalServerAddress();
+    final int dataStreamPort = NettyConfigKeys.DataStream.port(server.getProperties());
+    return RaftPeer.newBuilder()
+        .setId(server.getId())
+        .setAddress(rpc)
+        .setDataStreamAddress(NetUtils.createSocketAddrForHost(rpc.getHostName(), dataStreamPort))
+        .build();
+  }
+
   @Before
   public void setup() {
     properties = new RaftProperties();
@@ -52,7 +64,7 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
   @Override
   protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
     final RaftProperties p = new RaftProperties(properties);
-    NettyConfigKeys.DataStream.setPort(p,  NetUtils.createSocketAddr(peer.getAddress()).getPort());
+    NettyConfigKeys.DataStream.setPort(p,  NetUtils.createSocketAddr(peer.getDataStreamAddress()).getPort());
     return super.newRaftServer(peer, p);
   }
 
@@ -106,6 +118,19 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
     runTestCloseStream(raftServers, 1_000_000, 10, expectedClientReply);
   }
 
+  void runTestCloseStream(List<RaftServer> raftServers, int bufferSize, int bufferNum,
+      RaftClientReply expectedClientReply) throws Exception {
+    try {
+      final List<RaftPeer> peers = raftServers.stream()
+          .map(TestDataStreamNetty::newRaftPeer)
+          .collect(Collectors.toList());
+      setup(peers, raftServers);
+      runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
+    } finally {
+      shutdown();
+    }
+  }
+
   @Test
   public void testCloseStreamPrimaryIsLeader() throws Exception {
     // primary is 0, leader is 0