You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/06 10:21:28 UTC

[incubator-ratis] branch master updated: RATIS-1186. Change the FileStore CLI to use Streaming (#325)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7849eee  RATIS-1186. Change the FileStore CLI to use Streaming (#325)
7849eee is described below

commit 7849eee948c47d5a1763fc4d2cb85de164c8f98c
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Sun Dec 6 18:21:18 2020 +0800

    RATIS-1186. Change the FileStore CLI to use Streaming (#325)
    
    * RATIS-1186. Change the FileStore CLI to use Streaming
    
    * fix code review
---
 .../ratis/examples/common/SubCommandBase.java      |  12 +-
 .../ratis/examples/filestore/cli/Client.java       |  63 +++++++++
 .../ratis/examples/filestore/cli/DataStream.java   | 146 +++++++++++++++++++++
 .../ratis/examples/filestore/cli/FileStore.java    |   1 +
 .../ratis/examples/filestore/cli/LoadGen.java      | 104 ++++++++-------
 .../ratis/examples/filestore/cli/Server.java       |  23 ++++
 .../apache/ratis/server/impl/RaftServerProxy.java  |   4 +-
 7 files changed, 305 insertions(+), 48 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index ccb466b..15f1c37 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -34,14 +34,15 @@ public abstract class SubCommandBase {
   private String raftGroupId = "demoRaftGroup123";
 
   @Parameter(names = {"--peers", "-r"}, description =
-      "Raft peers (format: name:host:port,"
+      "Raft peers (format: name:host:port:dataStreamPort,"
           + "name:host:port)", required = true)
   private String peers;
 
   public static RaftPeer[] parsePeers(String peers) {
     return Stream.of(peers.split(",")).map(address -> {
       String[] addressParts = address.split(":");
-      return RaftPeer.newBuilder().setId(addressParts[0]).setAddress(addressParts[1] + ":" + addressParts[2]).build();
+      return RaftPeer.newBuilder().setId(addressParts[0]).setAddress(addressParts[1] + ":" + addressParts[2])
+          .setDataStreamAddress(addressParts[1] + ":" + addressParts[3]).build();
     }).toArray(RaftPeer[]::new);
   }
 
@@ -49,6 +50,13 @@ public abstract class SubCommandBase {
     return parsePeers(peers);
   }
 
+  public RaftPeer getPrimary() {
+    return parsePeers(peers)[0];
+  }
+
+  public boolean isPrimary(String id) {
+    return getPrimary().getId().toString().equals(id);
+  }
   public abstract void run() throws Exception;
 
   public String getRaftGroupId() {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 916d1c1..fb12f45 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -17,11 +17,14 @@
  */
 package org.apache.ratis.examples.filestore.cli;
 
+import com.beust.jcommander.Parameter;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.examples.common.SubCommandBase;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
@@ -35,6 +38,9 @@ import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -42,6 +48,27 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class Client extends SubCommandBase {
 
+  @Parameter(names = {"--size"}, description = "Size of each file in bytes", required = true)
+  private int fileSizeInBytes;
+
+  @Parameter(names = {"--bufferSize"}, description = "Size of buffer in bytes, should less than 4MB, " +
+      "i.e BUFFER_BYTE_LIMIT_DEFAULT", required = true)
+  private int bufferSizeInBytes;
+
+  @Parameter(names = {"--numFiles"}, description = "Number of files to be written", required = true)
+  private int numFiles;
+
+  public int getFileSizeInBytes() {
+    return fileSizeInBytes;
+  }
+
+  public int getBufferSizeInBytes() {
+    return bufferSizeInBytes;
+  }
+
+  public int getNumFiles() {
+    return numFiles;
+  }
 
   @Override
   public void run() throws Exception {
@@ -58,6 +85,7 @@ public abstract class Client extends SubCommandBase {
         SizeInBytes.valueOf(raftSegmentPreallocatedSize));
     RaftServerConfigKeys.Log.setSegmentSizeMax(raftProperties,
         SizeInBytes.valueOf(1 * 1024 * 1024 * 1024L));
+    RaftConfigKeys.DataStream.setType(raftProperties, SupportedDataStreamType.NETTY);
 
     RaftServerConfigKeys.Log.setSegmentCacheNumMax(raftProperties, 2);
 
@@ -73,10 +101,45 @@ public abstract class Client extends SubCommandBase {
         RaftClient.newBuilder().setProperties(raftProperties);
     builder.setRaftGroup(raftGroup);
     builder.setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
+    builder.setPrimaryDataStreamServer(getPrimary());
     RaftClient client = builder.build();
 
     operation(client);
   }
 
+  public List<String> generateFiles() throws IOException {
+    String entropy = RandomStringUtils.randomAlphanumeric(numFiles);
+    List<String> paths = new ArrayList<>();
+    for (int i = 0; i < numFiles; i ++) {
+      String path = "file-" + entropy + "-" + i;
+      paths.add(path);
+      writeFile(path, fileSizeInBytes, bufferSizeInBytes);
+    }
+
+    return paths;
+  }
+
+  public void writeFile(String path, int fileSize, int bufferSize) throws IOException {
+    RandomAccessFile raf = null;
+    try {
+      raf = new RandomAccessFile(path, "rw");
+      int offset = 0;
+      while (offset < fileSize) {
+        final int remaining = fileSize - offset;
+        final int chunkSize = Math.min(remaining, bufferSize);
+        byte[] buffer = new byte[chunkSize];
+        for (int i = 0; i < chunkSize; i ++) {
+          buffer[i]= (byte) ('A' + i % 23);
+        }
+        raf.write(buffer);
+        offset += chunkSize;
+      }
+    } finally {
+      if (raf != null) {
+        raf.close();
+      }
+    }
+  }
+
   protected abstract void operation(RaftClient client) throws IOException;
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
new file mode 100644
index 0000000..5ca07d2
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.examples.filestore.FileStoreClient;
+import org.apache.ratis.protocol.DataStreamReply;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Subcommand to generate load in filestore data stream state machine.
+ */
+@Parameters(commandDescription = "Load Generator for FileStore DataStream")
+public class DataStream extends Client {
+
+  @Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
+  private String dataStreamType = "NettyFileRegion";
+
+  @Override
+  protected void operation(RaftClient client) throws IOException {
+    List<String> paths = generateFiles();
+    FileStoreClient fileStoreClient = new FileStoreClient(client);
+    System.out.println("Starting DataStream write now ");
+
+    long startTime = System.currentTimeMillis();
+
+    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient));
+
+    long endTime = System.currentTimeMillis();
+
+    System.out.println("Total files written: " + getNumFiles());
+    System.out.println("Each files size: " + getFileSizeInBytes());
+    System.out.println("Total data written: " + totalWrittenBytes + " bytes");
+    System.out.println("Total time taken: " + (endTime - startTime) + " millis");
+
+    client.close();
+    System.exit(0);
+  }
+
+  private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
+      List<String> paths, FileStoreClient fileStoreClient) throws IOException {
+    Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
+    for(String path : paths) {
+      File file = new File(path);
+      FileInputStream fis = new FileInputStream(file);
+      final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
+
+      if (dataStreamType.equals("DirectByteBuffer")) {
+        fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel()));
+      } else if (dataStreamType.equals("MappedByteBuffer")) {
+        fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
+      } else if (dataStreamType.equals("NettyFileRegion")) {
+        fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
+      } else {
+        System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
+      }
+    }
+    return fileMap;
+  }
+
+  private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply>>> fileMap) {
+    long totalBytes = 0;
+    for (List<CompletableFuture<DataStreamReply>> futures : fileMap.values()) {
+      long writtenLen = 0;
+      for (CompletableFuture<DataStreamReply> future : futures) {
+        writtenLen += future.join().getBytesWritten();
+      }
+
+      if (writtenLen != getFileSizeInBytes()) {
+        System.out.println("File written:" + writtenLen + " does not match expected:" + getFileSizeInBytes());
+      }
+
+      totalBytes += writtenLen;
+    }
+    return totalBytes;
+  }
+
+  private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
+      FileChannel fileChannel) throws IOException {
+    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+
+    int bytesToRead = getBufferSizeInBytes();
+    if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < getBufferSizeInBytes()) {
+      bytesToRead = getFileSizeInBytes();
+    }
+
+    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+    long offset = 0L;
+
+    while (fileChannel.read(byteBuffer) > 0) {
+      byteBuffer.flip();
+      futures.add(dataStreamOutput.writeAsync(byteBuffer, offset + bytesToRead == getFileSizeInBytes()));
+      offset += bytesToRead;
+      bytesToRead = (int) Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
+      if (bytesToRead > 0) {
+        byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+      }
+    }
+
+    return futures;
+  }
+
+  private List<CompletableFuture<DataStreamReply>> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput,
+      FileChannel fileChannel) throws IOException {
+    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
+    futures.add(dataStreamOutput.writeAsync(mappedByteBuffer, true));
+    return futures;
+  }
+
+  private List<CompletableFuture<DataStreamReply>> writeByNettyFileRegion(
+      DataStreamOutput dataStreamOutput, File file) {
+    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    futures.add(dataStreamOutput.writeAsync(file));
+    return futures;
+  }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java
index dd19c6c..9d50e34 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java
@@ -34,6 +34,7 @@ public final class FileStore {
     List<SubCommandBase> commands = new ArrayList<>();
     commands.add(new Server());
     commands.add(new LoadGen());
+    commands.add(new DataStream());
     return commands;
   }
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index 0731405..5a14e65 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -17,20 +17,19 @@
  */
 package org.apache.ratis.examples.filestore.cli;
 
-import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Subcommand to generate load in filestore state machine.
@@ -38,56 +37,73 @@ import java.util.concurrent.atomic.AtomicLong;
 @Parameters(commandDescription = "Load Generator for FileStore")
 public class LoadGen extends Client {
 
-  private static final String UTF8_CSN = StandardCharsets.UTF_8.name();
-
-  @Parameter(names = {"--size"}, description = "Size of each file", required = true)
-  private String size;
-
-  @Parameter(names = {"--numFiles"}, description = "Number of files", required = true)
-  private String numFiles;
-
-  private static byte[] string2Bytes(String str) {
-    try {
-      return str.getBytes(UTF8_CSN);
-    } catch (UnsupportedEncodingException e) {
-      throw new IllegalArgumentException("UTF8 decoding is not supported", e);
-    }
-  }
-
   @Override
   protected void operation(RaftClient client) throws IOException {
-    int length = Integer.parseInt(size);
-    int num = Integer.parseInt(numFiles);
-    AtomicLong totalBytes = new AtomicLong(0);
-    String entropy = RandomStringUtils.randomAlphanumeric(10);
-
-    byte[] fileValue = string2Bytes(RandomStringUtils.randomAscii(length));
+    List<String> paths = generateFiles();
     FileStoreClient fileStoreClient = new FileStoreClient(client);
+    System.out.println("Starting Async write now ");
 
-    System.out.println("Starting load now ");
     long startTime = System.currentTimeMillis();
-    List<CompletableFuture<Long>> futures = new ArrayList<>();
-    for (int i = 0; i < num; i++) {
-      String path = "file-" + entropy + "-" + i;
-      ByteBuffer b = ByteBuffer.wrap(fileValue);
-      futures.add(fileStoreClient.writeAsync(path, 0, true, b));
-    }
 
-    for (CompletableFuture<Long> future : futures) {
-      Long writtenLen = future.join();
-      totalBytes.addAndGet(writtenLen);
-      if (writtenLen != length) {
-        System.out.println("File length written is wrong: " + writtenLen + length);
-      }
-    }
+    long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient));
+
     long endTime = System.currentTimeMillis();
 
-    System.out.println("Total files written: " + futures.size());
-    System.out.println("Each files size: " + length);
-    System.out.println("Total data written: " + totalBytes + " bytes");
+    System.out.println("Total files written: " + getNumFiles());
+    System.out.println("Each files size: " + getFileSizeInBytes());
+    System.out.println("Total data written: " + totalWrittenBytes + " bytes");
     System.out.println("Total time taken: " + (endTime - startTime) + " millis");
 
     client.close();
     System.exit(0);
   }
+
+  private Map<String, List<CompletableFuture<Long>>> writeByHeapByteBuffer(
+      List<String> paths, FileStoreClient fileStoreClient) throws IOException {
+    Map<String, List<CompletableFuture<Long>>> fileMap = new HashMap<>();
+
+    for(String path : paths) {
+      List<CompletableFuture<Long>> futures = new ArrayList<>();
+      File file = new File(path);
+      FileInputStream fis = new FileInputStream(file);
+
+      int bytesToRead = getBufferSizeInBytes();
+      if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < (long)getBufferSizeInBytes()) {
+        bytesToRead = getFileSizeInBytes();
+      }
+
+      byte[] buffer = new byte[bytesToRead];
+      long offset = 0L;
+      while(fis.read(buffer, 0, bytesToRead) > 0) {
+        ByteBuffer b = ByteBuffer.wrap(buffer);
+        futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b));
+        offset += bytesToRead;
+        bytesToRead = (int)Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
+        if (bytesToRead > 0) {
+          buffer = new byte[bytesToRead];
+        }
+      }
+
+      fileMap.put(path, futures);
+    }
+
+    return fileMap;
+  }
+
+  private long waitWriteFinish(Map<String, List<CompletableFuture<Long>>> fileMap) {
+    long totalBytes = 0;
+    for (List<CompletableFuture<Long>> futures : fileMap.values()) {
+      long writtenLen = 0;
+      for (CompletableFuture<Long> future : futures) {
+        writtenLen += future.join();
+      }
+
+      if (writtenLen != getFileSizeInBytes()) {
+        System.out.println("File written:" + writtenLen + " does not match expected:" + getFileSizeInBytes());
+      }
+
+      totalBytes += writtenLen;
+    }
+    return totalBytes;
+  }
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 9af81ff..d135c59 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -19,18 +19,23 @@ package org.apache.ratis.examples.filestore.cli;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.examples.common.SubCommandBase;
 import org.apache.ratis.examples.filestore.FileStoreCommon;
 import org.apache.ratis.examples.filestore.FileStoreStateMachine;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.metrics.JVMMetrics;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
@@ -38,8 +43,11 @@ import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Class to start a ratis arithmetic example server.
@@ -60,8 +68,15 @@ public class Server extends SubCommandBase {
     RaftPeerId peerId = RaftPeerId.valueOf(id);
     RaftProperties properties = new RaftProperties();
 
+    // Avoid leader change affect the performance
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(2, TimeUnit.SECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties, TimeDuration.valueOf(3, TimeUnit.SECONDS));
+
     final int port = NetUtils.createSocketAddr(getPeer(peerId).getAddress()).getPort();
     GrpcConfigKeys.Server.setPort(properties, port);
+    final int dataStreamport = NetUtils.createSocketAddr(getPeer(peerId).getDataStreamAddress()).getPort();
+    NettyConfigKeys.DataStream.setPort(properties, dataStreamport);
+    RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
     properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
     ConfUtils.setFile(properties::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
@@ -78,9 +93,17 @@ public class Server extends SubCommandBase {
 
     raftServer.start();
 
+    if (isPrimary(id)) {
+      ((RaftServerProxy) raftServer).getDataStreamServerRpc()
+          .addRaftPeers(getOtherRaftPeers(Arrays.asList(getPeers())));
+    }
+
     for (; raftServer.getLifeCycleState() != LifeCycle.State.CLOSED; ) {
       TimeUnit.SECONDS.sleep(1);
     }
   }
 
+  private Collection<RaftPeer> getOtherRaftPeers(Collection<RaftPeer> peers) {
+    return peers.stream().filter(r -> !r.getId().toString().equals(id)).collect(Collectors.toList());
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 81b010c..834b90a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -63,7 +63,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-class RaftServerProxy implements RaftServer {
+public class RaftServerProxy implements RaftServer {
   /**
    * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
    *
@@ -306,7 +306,7 @@ class RaftServerProxy implements RaftServer {
     return serverRpc;
   }
 
-  DataStreamServerRpc getDataStreamServerRpc() {
+  public DataStreamServerRpc getDataStreamServerRpc() {
     return dataStreamServerRpc;
   }