You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/06 10:21:28 UTC
[incubator-ratis] branch master updated: RATIS-1186. Change the
FileStore CLI to use Streaming (#325)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7849eee RATIS-1186. Change the FileStore CLI to use Streaming (#325)
7849eee is described below
commit 7849eee948c47d5a1763fc4d2cb85de164c8f98c
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Sun Dec 6 18:21:18 2020 +0800
RATIS-1186. Change the FileStore CLI to use Streaming (#325)
* RATIS-1186. Change the FileStore CLI to use Streaming
* fix code review
---
.../ratis/examples/common/SubCommandBase.java | 12 +-
.../ratis/examples/filestore/cli/Client.java | 63 +++++++++
.../ratis/examples/filestore/cli/DataStream.java | 146 +++++++++++++++++++++
.../ratis/examples/filestore/cli/FileStore.java | 1 +
.../ratis/examples/filestore/cli/LoadGen.java | 104 ++++++++-------
.../ratis/examples/filestore/cli/Server.java | 23 ++++
.../apache/ratis/server/impl/RaftServerProxy.java | 4 +-
7 files changed, 305 insertions(+), 48 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index ccb466b..15f1c37 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -34,14 +34,15 @@ public abstract class SubCommandBase {
private String raftGroupId = "demoRaftGroup123";
@Parameter(names = {"--peers", "-r"}, description =
- "Raft peers (format: name:host:port,"
+ "Raft peers (format: name:host:port:dataStreamPort,"
+ "name:host:port)", required = true)
private String peers;
public static RaftPeer[] parsePeers(String peers) {
return Stream.of(peers.split(",")).map(address -> {
String[] addressParts = address.split(":");
- return RaftPeer.newBuilder().setId(addressParts[0]).setAddress(addressParts[1] + ":" + addressParts[2]).build();
+ return RaftPeer.newBuilder().setId(addressParts[0]).setAddress(addressParts[1] + ":" + addressParts[2])
+ .setDataStreamAddress(addressParts[1] + ":" + addressParts[3]).build();
}).toArray(RaftPeer[]::new);
}
@@ -49,6 +50,13 @@ public abstract class SubCommandBase {
return parsePeers(peers);
}
+ public RaftPeer getPrimary() {
+ return parsePeers(peers)[0];
+ }
+
+ public boolean isPrimary(String id) {
+ return getPrimary().getId().toString().equals(id);
+ }
public abstract void run() throws Exception;
public String getRaftGroupId() {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 916d1c1..fb12f45 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -17,11 +17,14 @@
*/
package org.apache.ratis.examples.filestore.cli;
+import com.beust.jcommander.Parameter;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.examples.common.SubCommandBase;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
@@ -35,6 +38,9 @@ import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -42,6 +48,27 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class Client extends SubCommandBase {
+ @Parameter(names = {"--size"}, description = "Size of each file in bytes", required = true)
+ private int fileSizeInBytes;
+
+ @Parameter(names = {"--bufferSize"}, description = "Size of buffer in bytes, should less than 4MB, " +
+ "i.e BUFFER_BYTE_LIMIT_DEFAULT", required = true)
+ private int bufferSizeInBytes;
+
+ @Parameter(names = {"--numFiles"}, description = "Number of files to be written", required = true)
+ private int numFiles;
+
+ public int getFileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public int getBufferSizeInBytes() {
+ return bufferSizeInBytes;
+ }
+
+ public int getNumFiles() {
+ return numFiles;
+ }
@Override
public void run() throws Exception {
@@ -58,6 +85,7 @@ public abstract class Client extends SubCommandBase {
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setSegmentSizeMax(raftProperties,
SizeInBytes.valueOf(1 * 1024 * 1024 * 1024L));
+ RaftConfigKeys.DataStream.setType(raftProperties, SupportedDataStreamType.NETTY);
RaftServerConfigKeys.Log.setSegmentCacheNumMax(raftProperties, 2);
@@ -73,10 +101,45 @@ public abstract class Client extends SubCommandBase {
RaftClient.newBuilder().setProperties(raftProperties);
builder.setRaftGroup(raftGroup);
builder.setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
+ builder.setPrimaryDataStreamServer(getPrimary());
RaftClient client = builder.build();
operation(client);
}
+ public List<String> generateFiles() throws IOException {
+ String entropy = RandomStringUtils.randomAlphanumeric(numFiles);
+ List<String> paths = new ArrayList<>();
+ for (int i = 0; i < numFiles; i ++) {
+ String path = "file-" + entropy + "-" + i;
+ paths.add(path);
+ writeFile(path, fileSizeInBytes, bufferSizeInBytes);
+ }
+
+ return paths;
+ }
+
+ public void writeFile(String path, int fileSize, int bufferSize) throws IOException {
+ RandomAccessFile raf = null;
+ try {
+ raf = new RandomAccessFile(path, "rw");
+ int offset = 0;
+ while (offset < fileSize) {
+ final int remaining = fileSize - offset;
+ final int chunkSize = Math.min(remaining, bufferSize);
+ byte[] buffer = new byte[chunkSize];
+ for (int i = 0; i < chunkSize; i ++) {
+ buffer[i]= (byte) ('A' + i % 23);
+ }
+ raf.write(buffer);
+ offset += chunkSize;
+ }
+ } finally {
+ if (raf != null) {
+ raf.close();
+ }
+ }
+ }
+
protected abstract void operation(RaftClient client) throws IOException;
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
new file mode 100644
index 0000000..5ca07d2
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.examples.filestore.FileStoreClient;
+import org.apache.ratis.protocol.DataStreamReply;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Subcommand to generate load in filestore data stream state machine.
+ */
+@Parameters(commandDescription = "Load Generator for FileStore DataStream")
+public class DataStream extends Client {
+
+ @Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
+ private String dataStreamType = "NettyFileRegion";
+
+ @Override
+ protected void operation(RaftClient client) throws IOException {
+ List<String> paths = generateFiles();
+ FileStoreClient fileStoreClient = new FileStoreClient(client);
+ System.out.println("Starting DataStream write now ");
+
+ long startTime = System.currentTimeMillis();
+
+ long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient));
+
+ long endTime = System.currentTimeMillis();
+
+ System.out.println("Total files written: " + getNumFiles());
+ System.out.println("Each files size: " + getFileSizeInBytes());
+ System.out.println("Total data written: " + totalWrittenBytes + " bytes");
+ System.out.println("Total time taken: " + (endTime - startTime) + " millis");
+
+ client.close();
+ System.exit(0);
+ }
+
+ private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
+ List<String> paths, FileStoreClient fileStoreClient) throws IOException {
+ Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
+ for(String path : paths) {
+ File file = new File(path);
+ FileInputStream fis = new FileInputStream(file);
+ final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
+
+ if (dataStreamType.equals("DirectByteBuffer")) {
+ fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel()));
+ } else if (dataStreamType.equals("MappedByteBuffer")) {
+ fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
+ } else if (dataStreamType.equals("NettyFileRegion")) {
+ fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
+ } else {
+ System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
+ }
+ }
+ return fileMap;
+ }
+
+ private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply>>> fileMap) {
+ long totalBytes = 0;
+ for (List<CompletableFuture<DataStreamReply>> futures : fileMap.values()) {
+ long writtenLen = 0;
+ for (CompletableFuture<DataStreamReply> future : futures) {
+ writtenLen += future.join().getBytesWritten();
+ }
+
+ if (writtenLen != getFileSizeInBytes()) {
+ System.out.println("File written:" + writtenLen + " does not match expected:" + getFileSizeInBytes());
+ }
+
+ totalBytes += writtenLen;
+ }
+ return totalBytes;
+ }
+
+ private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
+ FileChannel fileChannel) throws IOException {
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+
+ int bytesToRead = getBufferSizeInBytes();
+ if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < getBufferSizeInBytes()) {
+ bytesToRead = getFileSizeInBytes();
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+ long offset = 0L;
+
+ while (fileChannel.read(byteBuffer) > 0) {
+ byteBuffer.flip();
+ futures.add(dataStreamOutput.writeAsync(byteBuffer, offset + bytesToRead == getFileSizeInBytes()));
+ offset += bytesToRead;
+ bytesToRead = (int) Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
+ if (bytesToRead > 0) {
+ byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+ }
+ }
+
+ return futures;
+ }
+
+ private List<CompletableFuture<DataStreamReply>> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput,
+ FileChannel fileChannel) throws IOException {
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
+ futures.add(dataStreamOutput.writeAsync(mappedByteBuffer, true));
+ return futures;
+ }
+
+ private List<CompletableFuture<DataStreamReply>> writeByNettyFileRegion(
+ DataStreamOutput dataStreamOutput, File file) {
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ futures.add(dataStreamOutput.writeAsync(file));
+ return futures;
+ }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java
index dd19c6c..9d50e34 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/FileStore.java
@@ -34,6 +34,7 @@ public final class FileStore {
List<SubCommandBase> commands = new ArrayList<>();
commands.add(new Server());
commands.add(new LoadGen());
+ commands.add(new DataStream());
return commands;
}
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index 0731405..5a14e65 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -17,20 +17,19 @@
*/
package org.apache.ratis.examples.filestore.cli;
-import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.examples.filestore.FileStoreClient;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Subcommand to generate load in filestore state machine.
@@ -38,56 +37,73 @@ import java.util.concurrent.atomic.AtomicLong;
@Parameters(commandDescription = "Load Generator for FileStore")
public class LoadGen extends Client {
- private static final String UTF8_CSN = StandardCharsets.UTF_8.name();
-
- @Parameter(names = {"--size"}, description = "Size of each file", required = true)
- private String size;
-
- @Parameter(names = {"--numFiles"}, description = "Number of files", required = true)
- private String numFiles;
-
- private static byte[] string2Bytes(String str) {
- try {
- return str.getBytes(UTF8_CSN);
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException("UTF8 decoding is not supported", e);
- }
- }
-
@Override
protected void operation(RaftClient client) throws IOException {
- int length = Integer.parseInt(size);
- int num = Integer.parseInt(numFiles);
- AtomicLong totalBytes = new AtomicLong(0);
- String entropy = RandomStringUtils.randomAlphanumeric(10);
-
- byte[] fileValue = string2Bytes(RandomStringUtils.randomAscii(length));
+ List<String> paths = generateFiles();
FileStoreClient fileStoreClient = new FileStoreClient(client);
+ System.out.println("Starting Async write now ");
- System.out.println("Starting load now ");
long startTime = System.currentTimeMillis();
- List<CompletableFuture<Long>> futures = new ArrayList<>();
- for (int i = 0; i < num; i++) {
- String path = "file-" + entropy + "-" + i;
- ByteBuffer b = ByteBuffer.wrap(fileValue);
- futures.add(fileStoreClient.writeAsync(path, 0, true, b));
- }
- for (CompletableFuture<Long> future : futures) {
- Long writtenLen = future.join();
- totalBytes.addAndGet(writtenLen);
- if (writtenLen != length) {
- System.out.println("File length written is wrong: " + writtenLen + length);
- }
- }
+ long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient));
+
long endTime = System.currentTimeMillis();
- System.out.println("Total files written: " + futures.size());
- System.out.println("Each files size: " + length);
- System.out.println("Total data written: " + totalBytes + " bytes");
+ System.out.println("Total files written: " + getNumFiles());
+ System.out.println("Each files size: " + getFileSizeInBytes());
+ System.out.println("Total data written: " + totalWrittenBytes + " bytes");
System.out.println("Total time taken: " + (endTime - startTime) + " millis");
client.close();
System.exit(0);
}
+
+ private Map<String, List<CompletableFuture<Long>>> writeByHeapByteBuffer(
+ List<String> paths, FileStoreClient fileStoreClient) throws IOException {
+ Map<String, List<CompletableFuture<Long>>> fileMap = new HashMap<>();
+
+ for(String path : paths) {
+ List<CompletableFuture<Long>> futures = new ArrayList<>();
+ File file = new File(path);
+ FileInputStream fis = new FileInputStream(file);
+
+ int bytesToRead = getBufferSizeInBytes();
+ if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < (long)getBufferSizeInBytes()) {
+ bytesToRead = getFileSizeInBytes();
+ }
+
+ byte[] buffer = new byte[bytesToRead];
+ long offset = 0L;
+ while(fis.read(buffer, 0, bytesToRead) > 0) {
+ ByteBuffer b = ByteBuffer.wrap(buffer);
+ futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b));
+ offset += bytesToRead;
+ bytesToRead = (int)Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
+ if (bytesToRead > 0) {
+ buffer = new byte[bytesToRead];
+ }
+ }
+
+ fileMap.put(path, futures);
+ }
+
+ return fileMap;
+ }
+
+ private long waitWriteFinish(Map<String, List<CompletableFuture<Long>>> fileMap) {
+ long totalBytes = 0;
+ for (List<CompletableFuture<Long>> futures : fileMap.values()) {
+ long writtenLen = 0;
+ for (CompletableFuture<Long> future : futures) {
+ writtenLen += future.join();
+ }
+
+ if (writtenLen != getFileSizeInBytes()) {
+ System.out.println("File written:" + writtenLen + " does not match expected:" + getFileSizeInBytes());
+ }
+
+ totalBytes += writtenLen;
+ }
+ return totalBytes;
+ }
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 9af81ff..d135c59 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -19,18 +19,23 @@ package org.apache.ratis.examples.filestore.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
+import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.examples.common.SubCommandBase;
import org.apache.ratis.examples.filestore.FileStoreCommon;
import org.apache.ratis.examples.filestore.FileStoreStateMachine;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.metrics.JVMMetrics;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
@@ -38,8 +43,11 @@ import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;
import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* Class to start a ratis arithmetic example server.
@@ -60,8 +68,15 @@ public class Server extends SubCommandBase {
RaftPeerId peerId = RaftPeerId.valueOf(id);
RaftProperties properties = new RaftProperties();
+ // Avoid leader change affect the performance
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(2, TimeUnit.SECONDS));
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties, TimeDuration.valueOf(3, TimeUnit.SECONDS));
+
final int port = NetUtils.createSocketAddr(getPeer(peerId).getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port);
+ final int dataStreamport = NetUtils.createSocketAddr(getPeer(peerId).getDataStreamAddress()).getPort();
+ NettyConfigKeys.DataStream.setPort(properties, dataStreamport);
+ RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
ConfUtils.setFile(properties::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
@@ -78,9 +93,17 @@ public class Server extends SubCommandBase {
raftServer.start();
+ if (isPrimary(id)) {
+ ((RaftServerProxy) raftServer).getDataStreamServerRpc()
+ .addRaftPeers(getOtherRaftPeers(Arrays.asList(getPeers())));
+ }
+
for (; raftServer.getLifeCycleState() != LifeCycle.State.CLOSED; ) {
TimeUnit.SECONDS.sleep(1);
}
}
+ private Collection<RaftPeer> getOtherRaftPeers(Collection<RaftPeer> peers) {
+ return peers.stream().filter(r -> !r.getId().toString().equals(id)).collect(Collectors.toList());
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 81b010c..834b90a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -63,7 +63,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-class RaftServerProxy implements RaftServer {
+public class RaftServerProxy implements RaftServer {
/**
* A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
*
@@ -306,7 +306,7 @@ class RaftServerProxy implements RaftServer {
return serverRpc;
}
- DataStreamServerRpc getDataStreamServerRpc() {
+ public DataStreamServerRpc getDataStreamServerRpc() {
return dataStreamServerRpc;
}