You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/21 08:57:19 UTC

[incubator-ratis] branch master updated: RATIS-1248. Support network topology (#364)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d26861e  RATIS-1248. Support network topology (#364)
d26861e is described below

commit d26861eca140288bc8e75757e738ab5d128522d2
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Dec 21 16:57:09 2020 +0800

    RATIS-1248. Support network topology (#364)
---
 .../org/apache/ratis/client/api/DataStreamApi.java |  5 ++
 .../apache/ratis/client/impl/ClientProtoUtils.java | 24 +++++-
 .../ratis/client/impl/DataStreamClientImpl.java    |  8 +-
 .../apache/ratis/protocol/RaftClientRequest.java   | 17 ++++-
 .../java/org/apache/ratis/protocol/RaftPeerId.java | 16 ++++
 .../org/apache/ratis/protocol/RoutingTable.java    | 88 ++++++++++++++++++++++
 .../java/org/apache/ratis/util/ProtoUtils.java     | 50 ++++++++++++
 .../src/test/java/org/apache/ratis/BaseTest.java   | 26 +++++++
 .../filestore/FileStoreStreamingBaseTest.java      | 12 +--
 .../ratis/examples/filestore/FileStoreWriter.java  |  2 +-
 .../ratis/netty/server/DataStreamManagement.java   | 63 +++++++++-------
 .../ratis/netty/server/NettyServerStreamRpc.java   |  9 ++-
 ratis-proto/src/main/proto/Raft.proto              | 15 ++++
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  3 -
 .../datastream/DataStreamAsyncClusterTests.java    |  4 +-
 .../ratis/datastream/DataStreamBaseTest.java       |  5 +-
 .../ratis/datastream/DataStreamClusterTests.java   | 14 +++-
 ...ttyDataStreamChainTopologyWithGrpcCluster.java} |  2 +-
 ...ettyDataStreamStarTopologyWithGrpcCluster.java} | 20 ++++-
 .../datastream/TestNettyDataStreamWithMock.java    |  1 +
 20 files changed, 329 insertions(+), 55 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 81c84b7..7250b4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.client.api;
 
+import org.apache.ratis.protocol.RoutingTable;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -44,4 +46,7 @@ public interface DataStreamApi {
 
   /** Create a stream by providing a customized header message. */
   DataStreamOutput stream(ByteBuffer headerMessage);
+
+  /** Create a stream by providing a customized header message and route table. */
+  DataStreamOutput stream(ByteBuffer headerMessage, RoutingTable routingTable);
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index ae53ab0..2342ba3 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -115,9 +115,26 @@ public interface ClientProtoUtils {
     }
   }
 
+  static RoutingTable getRoutingTable(RaftClientRequestProto p) {
+    if (!p.hasRoutingTable()) {
+      return null;
+    }
+
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    for (RouteProto routeProto : p.getRoutingTable().getRoutesList()) {
+      RaftPeerId from = RaftPeerId.valueOf(routeProto.getPeerId().getId());
+      List<RaftPeerId> to = routeProto.getSuccessorsList().stream()
+          .map(v -> RaftPeerId.valueOf(v.getId())).collect(Collectors.toList());
+      builder.addSuccessors(from, to);
+    }
+
+    return builder.build();
+  }
+
   static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
     final RaftClientRequest.Type type = toRaftClientRequestType(p);
     final RaftRpcRequestProto request = p.getRpcRequest();
+
     return new RaftClientRequest(
         ClientId.valueOf(request.getRequestorId()),
         RaftPeerId.valueOf(request.getReplyId()),
@@ -125,7 +142,8 @@ public interface ClientProtoUtils {
         request.getCallId(),
         toMessage(p.getMessage()),
         type,
-        request.getSlidingWindowEntry());
+        request.getSlidingWindowEntry(),
+        getRoutingTable(p));
   }
 
   static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request) {
@@ -139,6 +157,10 @@ public interface ClientProtoUtils {
       b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
     }
 
+    if (request.getRoutingTable() != null) {
+      b.setRoutingTable(request.getRoutingTable().toProto());
+    }
+
     final RaftClientRequest.Type type = request.getType();
     switch (type.getTypeCase()) {
       case WRITE:
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index a893d85..35dd56c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -183,10 +183,16 @@ public class DataStreamClientImpl implements DataStreamClient {
 
   @Override
   public DataStreamOutputRpc stream(ByteBuffer headerMessage) {
+    return stream(headerMessage, null);
+  }
+
+  @Override
+  public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable routingTable) {
     final Message message =
         Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
     RaftClientRequest request = new RaftClientRequest(clientId, dataStreamServer.getId(), groupId,
-        CallId.getAndIncrement(), message, RaftClientRequest.dataStreamRequestType(), null);
+        CallId.getAndIncrement(), message, RaftClientRequest.dataStreamRequestType(), null,
+        routingTable);
     return new DataStreamOutputImpl(request);
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 85c8620..1b745c9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -235,17 +235,28 @@ public class RaftClientRequest extends RaftClientMessage {
 
   private final SlidingWindowEntry slidingWindowEntry;
 
+  private final RoutingTable routingTable;
+
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
-    this(clientId, serverId, groupId, callId, null, type, null);
+    this(clientId, serverId, groupId, callId, null, type, null, null);
   }
 
   public RaftClientRequest(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry) {
+    this(clientId, serverId, groupId, callId, message, type, slidingWindowEntry, null);
+  }
+
+  @SuppressWarnings("parameternumber")
+  public RaftClientRequest(
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+      long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
+      RoutingTable routingTable) {
     super(clientId, serverId, groupId, callId);
     this.message = message;
     this.type = type;
     this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
+    this.routingTable = routingTable;
   }
 
   @Override
@@ -269,6 +280,10 @@ public class RaftClientRequest extends RaftClientMessage {
     return getType().is(typeCase);
   }
 
+  public RoutingTable getRoutingTable() {
+    return routingTable;
+  }
+
   @Override
   public String toString() {
     return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index 1328e1a..5da8969 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -17,13 +17,16 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 
 /**
  * Id of Raft Peer which is globally unique.
@@ -51,15 +54,28 @@ public final class RaftPeerId {
   /** The corresponding bytes of {@link #idString}. */
   private final ByteString id;
 
+  private final Supplier<RaftPeerIdProto> raftPeerIdProto;
+
   private RaftPeerId(String id) {
     this.idString = Objects.requireNonNull(id, "id == null");
     this.id = ByteString.copyFrom(idString, StandardCharsets.UTF_8);
+    this.raftPeerIdProto = JavaUtils.memoize(this::buildRaftPeerIdProto);
   }
 
   private RaftPeerId(ByteString id) {
     this.id = Objects.requireNonNull(id, "id == null");
     Preconditions.assertTrue(id.size() > 0, "id is empty.");
     this.idString = id.toString(StandardCharsets.UTF_8);
+    this.raftPeerIdProto = JavaUtils.memoize(this::buildRaftPeerIdProto);
+  }
+
+  private RaftPeerIdProto buildRaftPeerIdProto() {
+    final RaftPeerIdProto.Builder builder = RaftPeerIdProto.newBuilder().setId(id);
+    return builder.build();
+  }
+
+  public RaftPeerIdProto getRaftPeerIdProto() {
+    return raftPeerIdProto.get();
   }
 
   /**
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
new file mode 100644
index 0000000..2083f42
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.proto.RaftProtos.RoutingTableProto;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface RoutingTable {
+  Set<RaftPeerId> getSuccessors(RaftPeerId peerId);
+
+  RoutingTableProto toProto();
+
+  static Builder newBuilder() {
+    return new Builder();
+  }
+
+
+  class Builder {
+    private final AtomicReference<Map<RaftPeerId, Set<RaftPeerId>>> ref = new AtomicReference<>(new HashMap<>());
+
+    private Builder() {}
+
+    private Set<RaftPeerId> computeIfAbsent(RaftPeerId peerId) {
+      return Optional.ofNullable(ref.get())
+          .map(map -> map.computeIfAbsent(peerId, key -> new HashSet<>()))
+          .orElseThrow(() -> new IllegalStateException("Already built"));
+    }
+
+    public Builder addSuccessor(RaftPeerId peerId, RaftPeerId successor) {
+      computeIfAbsent(peerId).add(successor);
+      return this;
+    }
+
+    public Builder addSuccessors(RaftPeerId peerId, Collection<RaftPeerId> successors) {
+      computeIfAbsent(peerId).addAll(successors);
+      return this;
+    }
+
+    public Builder addSuccessors(RaftPeerId peerId, RaftPeerId... successors) {
+      return addSuccessors(peerId, Arrays.asList(successors));
+    }
+
+    public RoutingTable build() {
+      return Optional.ofNullable(ref.getAndSet(null))
+          .map(RoutingTable::newRoutingTable)
+          .orElseThrow(() -> new IllegalStateException("RoutingTable Already built"));
+    }
+  }
+
+  static RoutingTable newRoutingTable(Map<RaftPeerId, Set<RaftPeerId>> map){
+    return new RoutingTable() {
+      @Override
+      public Set<RaftPeerId> getSuccessors(RaftPeerId peerId) {
+        return Optional.ofNullable(map.get(peerId)).orElseGet(Collections::emptySet);
+      }
+
+      @Override
+      public RoutingTableProto toProto() {
+        return RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build();
+      }
+    };
+  }
+}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2055869..d36141c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,7 +17,9 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.RouteProto;
 import org.apache.ratis.proto.RaftProtos.ThrowableProto;
 import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
 import org.apache.ratis.proto.RaftProtos.RaftGroupMemberIdProto;
@@ -39,7 +41,9 @@ import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -121,6 +125,14 @@ public interface ProtoUtils {
     return protos.stream().map(ProtoUtils::toRaftPeer).collect(Collectors.toList());
   }
 
+  static RaftPeerId toRaftPeerId(RaftPeerIdProto p) {
+    return RaftPeerId.valueOf(p.getId());
+  }
+
+  static List<RaftPeerId> toRaftPeerIds(List<RaftPeerIdProto> protos) {
+    return protos.stream().map(ProtoUtils::toRaftPeerId).collect(Collectors.toList());
+  }
+
   static Iterable<RaftPeerProto> toRaftPeerProtos(
       final Collection<RaftPeer> peers) {
     return () -> new Iterator<RaftPeerProto>() {
@@ -138,6 +150,44 @@ public interface ProtoUtils {
     };
   }
 
+  static Iterable<RaftPeerIdProto> toRaftPeerIdProtos(
+      final Collection<RaftPeerId> peers) {
+    return () -> new Iterator<RaftPeerIdProto>() {
+      private final Iterator<RaftPeerId> i = peers.iterator();
+
+      @Override
+      public boolean hasNext() {
+        return i.hasNext();
+      }
+
+      @Override
+      public RaftPeerIdProto next() {
+        return i.next().getRaftPeerIdProto();
+      }
+    };
+  }
+
+  static Iterable<RouteProto> toRouteProtos(
+      final Map<RaftPeerId, Set<RaftPeerId>> routingTable) {
+    return () -> new Iterator<RouteProto>() {
+      private final Iterator<Map.Entry<RaftPeerId, Set<RaftPeerId>>> i = routingTable.entrySet().iterator();
+
+      @Override
+      public boolean hasNext() {
+        return i.hasNext();
+      }
+
+      @Override
+      public RouteProto next() {
+        Map.Entry<RaftPeerId, Set<RaftPeerId>> entry = i.next();
+        return RouteProto.newBuilder()
+            .setPeerId(entry.getKey().getRaftPeerIdProto())
+            .addAllSuccessors(toRaftPeerIdProtos(entry.getValue()))
+            .build();
+      }
+    };
+  }
+
   static RaftGroupId toRaftGroupId(RaftGroupIdProto proto) {
     return RaftGroupId.valueOf(proto.getId());
   }
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 6267c3a..3b3d8cc 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -19,10 +19,15 @@ package org.apache.ratis;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.proto.RaftProtos.RoutingTableProto;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedRunnable;
 import org.junit.After;
@@ -35,7 +40,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
@@ -65,6 +77,20 @@ public abstract class BaseTest {
     }
   }
 
+  public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    RaftPeer previous = primary;
+    for (RaftPeer peer : peers) {
+      if (peer.equals(primary)) {
+        continue;
+      }
+      builder.addSuccessor(previous.getId(), peer.getId());
+      previous = peer;
+    }
+
+    return builder.build();
+  }
+
   @After
   public void assertNoFailures() {
     final Throwable e = firstException.get();
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index 598dc0c..59a83c5 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -67,10 +67,10 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
     final RaftGroup raftGroup = cluster.getGroup();
     final Collection<RaftPeer> peers = raftGroup.getPeers();
     Assert.assertEquals(NUM_PEERS, peers.size());
-    RaftPeer raftPeer = peers.iterator().next();
+    RaftPeer primary = peers.iterator().next();
 
     final CheckedSupplier<FileStoreClient, IOException> newClient =
-        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
 
     testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient);
     testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient);
@@ -88,10 +88,10 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
     final RaftGroup raftGroup = cluster.getGroup();
     final Collection<RaftPeer> peers = raftGroup.getPeers();
     Assert.assertEquals(NUM_PEERS, peers.size());
-    RaftPeer raftPeer = peers.iterator().next();
+    RaftPeer primary = peers.iterator().next();
 
     final CheckedSupplier<FileStoreClient, IOException> newClient =
-        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
 
     testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient);
     testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient);
@@ -133,8 +133,4 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
       future.get();
     }
   }
-
-  static class StreamWriter {
-
-  }
 }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 991ce82..530a618 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -134,7 +134,7 @@ class FileStoreWriter implements Closeable {
     return this;
   }
 
-  public FileStoreWriter streamWriteAndVerify() throws IOException {
+  public FileStoreWriter streamWriteAndVerify() {
     final int size = fileSize.getSizeInt();
     final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName, size);
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 121c69e..2bde5f0 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -34,8 +34,11 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
+import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServer.Division;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -56,6 +59,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -82,11 +86,6 @@ public class DataStreamManagement {
       return composeAsync(writeFuture, executor,
           n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, options, stream), executor));
     }
-
-    CompletableFuture<Long> close(Executor executor) {
-      return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(DataStreamManagement::close, executor));
-    }
   }
 
   static class RemoteStream {
@@ -99,26 +98,27 @@ public class DataStreamManagement {
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
       return out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions());
     }
-
-    CompletableFuture<DataStreamReply> close() {
-      return out.closeAsync();
-    }
   }
 
   static class StreamInfo {
     private final RaftClientRequest request;
     private final boolean primary;
     private final LocalStream local;
-    private final List<RemoteStream> remotes;
+    private final Set<RemoteStream> remotes;
+    private final RaftServer server;
     private final AtomicReference<CompletableFuture<Void>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
-    StreamInfo(RaftClientRequest request, boolean primary,
-        CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
+    StreamInfo(RaftClientRequest request, boolean primary, CompletableFuture<DataStream> stream, RaftServer server,
+        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams)
+        throws IOException {
       this.request = request;
       this.primary = primary;
       this.local = new LocalStream(stream);
-      this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toList());
+      this.server = server;
+      final Set<RaftPeer> successors = getSuccessors(server.getId());
+      final Set<DataStreamOutputRpc> outs = getStreams.apply(request, successors);
+      this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toSet());
     }
 
     AtomicReference<CompletableFuture<Void>> getPrevious() {
@@ -145,6 +145,26 @@ public class DataStreamManagement {
     public String toString() {
       return JavaUtils.getClassSimpleName(getClass()) + ":" + request;
     }
+
+    private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
+      final RaftGroupId groupId = request.getRaftGroupId();
+      final RaftConfiguration conf = server.getDivision(groupId).getRaftConf();
+      final RoutingTable routingTable = request.getRoutingTable();
+
+      if (routingTable != null) {
+        return routingTable.getSuccessors(peerId).stream().map(conf::getPeer).collect(Collectors.toSet());
+      }
+
+      if (isPrimary()) {
+        // Default start topology
+        // get the other peers from the current configuration
+        return conf.getCurrentPeers().stream()
+            .filter(p -> !p.getId().equals(server.getId()))
+            .collect(Collectors.toSet());
+      }
+
+      return Collections.emptySet();
+    }
   }
 
   static class StreamMap {
@@ -232,23 +252,12 @@ public class DataStreamManagement {
   }
 
   private StreamInfo newStreamInfo(ByteBuf buf,
-      CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final boolean isPrimary = server.getId().equals(request.getServerId());
-      final List<DataStreamOutputRpc> outs;
-      if (isPrimary) {
-        final RaftGroupId groupId = request.getRaftGroupId();
-        // get the other peers from the current configuration
-        final List<RaftPeer> others = server.getDivision(groupId).getRaftConf().getCurrentPeers().stream()
-            .filter(p -> !p.getId().equals(server.getId()))
-            .collect(Collectors.toList());
-        outs = getStreams.apply(request, others);
-      } else {
-        outs = Collections.emptyList();
-      }
-      return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), outs);
+      return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), server, getStreams);
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
@@ -362,7 +371,7 @@ public class DataStreamManagement {
   }
 
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-      CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 82811b7..a653d9b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -54,10 +54,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -77,8 +78,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       map.addRaftPeers(newPeers);
     }
 
-    List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers) throws IOException {
-      final List<DataStreamOutputRpc> outs = new ArrayList<>();
+    Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
+      final Set<DataStreamOutputRpc> outs = new HashSet<>();
       try {
         getDataStreamOutput(request, peers, outs);
       } catch (IOException e) {
@@ -88,7 +89,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers, List<DataStreamOutputRpc> outs)
+    private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputRpc> outs)
         throws IOException {
       for (RaftPeer peer : peers) {
         try {
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index f380581..fdb186e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -28,6 +28,10 @@ message RaftPeerProto {
   string dataStreamAddress = 4; // address of the data stream server
 }
 
+message RaftPeerIdProto {
+  bytes id = 1;      // id of the peer
+}
+
 message RaftGroupIdProto {
   bytes id = 1;
 }
@@ -280,6 +284,15 @@ message WatchRequestTypeProto {
   ReplicationLevel replication = 2;
 }
 
+message RouteProto {
+  RaftPeerIdProto peerId = 1;
+  repeated RaftPeerIdProto successors = 2;
+}
+
+message RoutingTableProto {
+  repeated RouteProto routes = 1;
+}
+
 // normal client request
 message RaftClientRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
@@ -294,6 +307,8 @@ message RaftClientRequestProto {
     DataStreamRequestTypeProto dataStream = 8;
     ForwardRequestTypeProto forward = 9;
   }
+
+  RoutingTableProto routingTable = 10;
 }
 
 message DataStreamPacketHeaderProto {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 254732d..bd638a2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -690,9 +690,6 @@ public abstract class MiniRaftCluster implements Closeable {
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, RetryPolicy retryPolicy, RaftPeer primaryServer) {
-    if (primaryServer == null) {
-      primaryServer = CollectionUtils.random(group.getPeers());
-    }
     RaftClient.Builder builder = RaftClient.newBuilder()
         .setRaftGroup(group)
         .setLeaderId(leaderId)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 869da26..fc61148 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -37,7 +37,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -147,7 +146,8 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     try(RaftClient client = cluster.createClient(primaryServer)) {
       ClientId primaryClientId = getPrimaryClientId(cluster, primaryServer);
       for (int i = 0; i < numStreams; i++) {
-        final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
+        final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+            .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer));
         futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
             servers, leader, out, bufferSize, bufferNum, primaryClientId, cluster, stepDownLeader).join(), executor));
       }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 579a965..90d2690 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -195,6 +195,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   protected RaftProperties properties;
 
   private List<Server> servers;
+  private List<RaftPeer> peers;
   private RaftGroup raftGroup;
 
   Server getPrimaryServer() {
@@ -373,6 +374,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
   void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
     raftGroup = RaftGroup.valueOf(groupId, peers);
+    this.peers = peers;
     servers = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
@@ -420,7 +422,8 @@ abstract class DataStreamBaseTest extends BaseTest {
       Exception expectedException, Exception headerException)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream(clientId)) {
-      final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
+      final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+          .stream(null, getRoutingTable(peers, getPrimaryServer().getPeer()));
       if (headerException != null) {
         final DataStreamReply headerReply = out.getHeaderFuture().join();
         Assert.assertFalse(headerReply.isSuccess());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index d9b7f5e..c9625a6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
@@ -29,6 +30,7 @@ import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.Timestamp;
 import org.apache.ratis.util.function.CheckedConsumer;
 import org.junit.Assert;
@@ -72,8 +74,10 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
   void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
     final RaftClientRequest request;
     final CompletableFuture<RaftClientReply> reply;
-    try (RaftClient client = cluster.createClient()) {
-      try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+    final RaftPeer primaryServer = CollectionUtils.random(cluster.getGroup().getPeers());
+    try (RaftClient client = cluster.createClient(primaryServer)) {
+      try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+          .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer))) {
         request = out.getHeader();
         reply = out.getRaftClientReplyFuture();
 
@@ -90,8 +94,10 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
       CheckedConsumer<DataStreamOutputImpl, Exception> testCase) throws Exception {
     final RaftClientRequest request;
     final CompletableFuture<RaftClientReply> reply;
-    try (RaftClient client = cluster.createClient()) {
-      try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+    final RaftPeer primaryServer = CollectionUtils.random(cluster.getGroup().getPeers());
+    try (RaftClient client = cluster.createClient(primaryServer)) {
+      try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+          .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer))) {
         request = out.getHeader();
         reply = out.getRaftClientReplyFuture();
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
similarity index 94%
copy from ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
index 13121c3..e4e9fef 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.datastream;
 
-public class TestNettyDataStreamWithGrpcCluster
+public class TestNettyDataStreamChainTopologyWithGrpcCluster
     extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
     implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
similarity index 59%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
index 13121c3..cd6bbc7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
@@ -17,7 +17,25 @@
  */
 package org.apache.ratis.datastream;
 
-public class TestNettyDataStreamWithGrpcCluster
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestNettyDataStreamStarTopologyWithGrpcCluster
     extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
     implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
+
+  @Override
+  public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    final List<RaftPeerId> others = peers.stream()
+        .filter(p -> !p.getId().equals(primary.getId())).map(v -> v.getId())
+        .collect(Collectors.toList());
+    builder.addSuccessors(primary.getId(), others);
+    return builder.build();
+  }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 10803fa..9b4e36b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -92,6 +92,7 @@ public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
 
       when(raftServer.getProperties()).thenReturn(properties);
       when(raftServer.getId()).thenReturn(peerId);
+      when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
       if (getStateMachineException == null) {
         MyDivision myDivision = new MyDivision(raftServer);
         when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);