You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/21 08:57:19 UTC
[incubator-ratis] branch master updated: RATIS-1248. Support
network topology (#364)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d26861e RATIS-1248. Support network topology (#364)
d26861e is described below
commit d26861eca140288bc8e75757e738ab5d128522d2
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Dec 21 16:57:09 2020 +0800
RATIS-1248. Support network topology (#364)
---
.../org/apache/ratis/client/api/DataStreamApi.java | 5 ++
.../apache/ratis/client/impl/ClientProtoUtils.java | 24 +++++-
.../ratis/client/impl/DataStreamClientImpl.java | 8 +-
.../apache/ratis/protocol/RaftClientRequest.java | 17 ++++-
.../java/org/apache/ratis/protocol/RaftPeerId.java | 16 ++++
.../org/apache/ratis/protocol/RoutingTable.java | 88 ++++++++++++++++++++++
.../java/org/apache/ratis/util/ProtoUtils.java | 50 ++++++++++++
.../src/test/java/org/apache/ratis/BaseTest.java | 26 +++++++
.../filestore/FileStoreStreamingBaseTest.java | 12 +--
.../ratis/examples/filestore/FileStoreWriter.java | 2 +-
.../ratis/netty/server/DataStreamManagement.java | 63 +++++++++-------
.../ratis/netty/server/NettyServerStreamRpc.java | 9 ++-
ratis-proto/src/main/proto/Raft.proto | 15 ++++
.../apache/ratis/server/impl/MiniRaftCluster.java | 3 -
.../datastream/DataStreamAsyncClusterTests.java | 4 +-
.../ratis/datastream/DataStreamBaseTest.java | 5 +-
.../ratis/datastream/DataStreamClusterTests.java | 14 +++-
...ttyDataStreamChainTopologyWithGrpcCluster.java} | 2 +-
...ettyDataStreamStarTopologyWithGrpcCluster.java} | 20 ++++-
.../datastream/TestNettyDataStreamWithMock.java | 1 +
20 files changed, 329 insertions(+), 55 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 81c84b7..7250b4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.client.api;
+import org.apache.ratis.protocol.RoutingTable;
+
import java.nio.ByteBuffer;
/**
@@ -44,4 +46,7 @@ public interface DataStreamApi {
/** Create a stream by providing a customized header message. */
DataStreamOutput stream(ByteBuffer headerMessage);
+
+ /** Create a stream by providing a customized header message and route table. */
+ DataStreamOutput stream(ByteBuffer headerMessage, RoutingTable routingTable);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index ae53ab0..2342ba3 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -115,9 +115,26 @@ public interface ClientProtoUtils {
}
}
+ static RoutingTable getRoutingTable(RaftClientRequestProto p) {
+ if (!p.hasRoutingTable()) {
+ return null;
+ }
+
+ RoutingTable.Builder builder = RoutingTable.newBuilder();
+ for (RouteProto routeProto : p.getRoutingTable().getRoutesList()) {
+ RaftPeerId from = RaftPeerId.valueOf(routeProto.getPeerId().getId());
+ List<RaftPeerId> to = routeProto.getSuccessorsList().stream()
+ .map(v -> RaftPeerId.valueOf(v.getId())).collect(Collectors.toList());
+ builder.addSuccessors(from, to);
+ }
+
+ return builder.build();
+ }
+
static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
final RaftClientRequest.Type type = toRaftClientRequestType(p);
final RaftRpcRequestProto request = p.getRpcRequest();
+
return new RaftClientRequest(
ClientId.valueOf(request.getRequestorId()),
RaftPeerId.valueOf(request.getReplyId()),
@@ -125,7 +142,8 @@ public interface ClientProtoUtils {
request.getCallId(),
toMessage(p.getMessage()),
type,
- request.getSlidingWindowEntry());
+ request.getSlidingWindowEntry(),
+ getRoutingTable(p));
}
static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request) {
@@ -139,6 +157,10 @@ public interface ClientProtoUtils {
b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
}
+ if (request.getRoutingTable() != null) {
+ b.setRoutingTable(request.getRoutingTable().toProto());
+ }
+
final RaftClientRequest.Type type = request.getType();
switch (type.getTypeCase()) {
case WRITE:
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index a893d85..35dd56c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -183,10 +183,16 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public DataStreamOutputRpc stream(ByteBuffer headerMessage) {
+ return stream(headerMessage, null);
+ }
+
+ @Override
+ public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable routingTable) {
final Message message =
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
RaftClientRequest request = new RaftClientRequest(clientId, dataStreamServer.getId(), groupId,
- CallId.getAndIncrement(), message, RaftClientRequest.dataStreamRequestType(), null);
+ CallId.getAndIncrement(), message, RaftClientRequest.dataStreamRequestType(), null,
+ routingTable);
return new DataStreamOutputImpl(request);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 85c8620..1b745c9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -235,17 +235,28 @@ public class RaftClientRequest extends RaftClientMessage {
private final SlidingWindowEntry slidingWindowEntry;
+ private final RoutingTable routingTable;
+
public RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
- this(clientId, serverId, groupId, callId, null, type, null);
+ this(clientId, serverId, groupId, callId, null, type, null, null);
}
public RaftClientRequest(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry) {
+ this(clientId, serverId, groupId, callId, message, type, slidingWindowEntry, null);
+ }
+
+ @SuppressWarnings("parameternumber")
+ public RaftClientRequest(
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+ long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
+ RoutingTable routingTable) {
super(clientId, serverId, groupId, callId);
this.message = message;
this.type = type;
this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
+ this.routingTable = routingTable;
}
@Override
@@ -269,6 +280,10 @@ public class RaftClientRequest extends RaftClientMessage {
return getType().is(typeCase);
}
+ public RoutingTable getRoutingTable() {
+ return routingTable;
+ }
+
@Override
public String toString() {
return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index 1328e1a..5da8969 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -17,13 +17,16 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
/**
* Id of Raft Peer which is globally unique.
@@ -51,15 +54,28 @@ public final class RaftPeerId {
/** The corresponding bytes of {@link #idString}. */
private final ByteString id;
+ private final Supplier<RaftPeerIdProto> raftPeerIdProto;
+
private RaftPeerId(String id) {
this.idString = Objects.requireNonNull(id, "id == null");
this.id = ByteString.copyFrom(idString, StandardCharsets.UTF_8);
+ this.raftPeerIdProto = JavaUtils.memoize(this::buildRaftPeerIdProto);
}
private RaftPeerId(ByteString id) {
this.id = Objects.requireNonNull(id, "id == null");
Preconditions.assertTrue(id.size() > 0, "id is empty.");
this.idString = id.toString(StandardCharsets.UTF_8);
+ this.raftPeerIdProto = JavaUtils.memoize(this::buildRaftPeerIdProto);
+ }
+
+ private RaftPeerIdProto buildRaftPeerIdProto() {
+ final RaftPeerIdProto.Builder builder = RaftPeerIdProto.newBuilder().setId(id);
+ return builder.build();
+ }
+
+ public RaftPeerIdProto getRaftPeerIdProto() {
+ return raftPeerIdProto.get();
}
/**
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
new file mode 100644
index 0000000..2083f42
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.proto.RaftProtos.RoutingTableProto;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface RoutingTable {
+ Set<RaftPeerId> getSuccessors(RaftPeerId peerId);
+
+ RoutingTableProto toProto();
+
+ static Builder newBuilder() {
+ return new Builder();
+ }
+
+
+ class Builder {
+ private final AtomicReference<Map<RaftPeerId, Set<RaftPeerId>>> ref = new AtomicReference<>(new HashMap<>());
+
+ private Builder() {}
+
+ private Set<RaftPeerId> computeIfAbsent(RaftPeerId peerId) {
+ return Optional.ofNullable(ref.get())
+ .map(map -> map.computeIfAbsent(peerId, key -> new HashSet<>()))
+ .orElseThrow(() -> new IllegalStateException("Already built"));
+ }
+
+ public Builder addSuccessor(RaftPeerId peerId, RaftPeerId successor) {
+ computeIfAbsent(peerId).add(successor);
+ return this;
+ }
+
+ public Builder addSuccessors(RaftPeerId peerId, Collection<RaftPeerId> successors) {
+ computeIfAbsent(peerId).addAll(successors);
+ return this;
+ }
+
+ public Builder addSuccessors(RaftPeerId peerId, RaftPeerId... successors) {
+ return addSuccessors(peerId, Arrays.asList(successors));
+ }
+
+ public RoutingTable build() {
+ return Optional.ofNullable(ref.getAndSet(null))
+ .map(RoutingTable::newRoutingTable)
+ .orElseThrow(() -> new IllegalStateException("RoutingTable Already built"));
+ }
+ }
+
+ static RoutingTable newRoutingTable(Map<RaftPeerId, Set<RaftPeerId>> map){
+ return new RoutingTable() {
+ @Override
+ public Set<RaftPeerId> getSuccessors(RaftPeerId peerId) {
+ return Optional.ofNullable(map.get(peerId)).orElseGet(Collections::emptySet);
+ }
+
+ @Override
+ public RoutingTableProto toProto() {
+ return RoutingTableProto.newBuilder().addAllRoutes(ProtoUtils.toRouteProtos(map)).build();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2055869..d36141c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,7 +17,9 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.RouteProto;
import org.apache.ratis.proto.RaftProtos.ThrowableProto;
import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
import org.apache.ratis.proto.RaftProtos.RaftGroupMemberIdProto;
@@ -39,7 +41,9 @@ import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -121,6 +125,14 @@ public interface ProtoUtils {
return protos.stream().map(ProtoUtils::toRaftPeer).collect(Collectors.toList());
}
+ static RaftPeerId toRaftPeerId(RaftPeerIdProto p) {
+ return RaftPeerId.valueOf(p.getId());
+ }
+
+ static List<RaftPeerId> toRaftPeerIds(List<RaftPeerIdProto> protos) {
+ return protos.stream().map(ProtoUtils::toRaftPeerId).collect(Collectors.toList());
+ }
+
static Iterable<RaftPeerProto> toRaftPeerProtos(
final Collection<RaftPeer> peers) {
return () -> new Iterator<RaftPeerProto>() {
@@ -138,6 +150,44 @@ public interface ProtoUtils {
};
}
+ static Iterable<RaftPeerIdProto> toRaftPeerIdProtos(
+ final Collection<RaftPeerId> peers) {
+ return () -> new Iterator<RaftPeerIdProto>() {
+ private final Iterator<RaftPeerId> i = peers.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ @Override
+ public RaftPeerIdProto next() {
+ return i.next().getRaftPeerIdProto();
+ }
+ };
+ }
+
+ static Iterable<RouteProto> toRouteProtos(
+ final Map<RaftPeerId, Set<RaftPeerId>> routingTable) {
+ return () -> new Iterator<RouteProto>() {
+ private final Iterator<Map.Entry<RaftPeerId, Set<RaftPeerId>>> i = routingTable.entrySet().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ @Override
+ public RouteProto next() {
+ Map.Entry<RaftPeerId, Set<RaftPeerId>> entry = i.next();
+ return RouteProto.newBuilder()
+ .setPeerId(entry.getKey().getRaftPeerIdProto())
+ .addAllSuccessors(toRaftPeerIdProtos(entry.getValue()))
+ .build();
+ }
+ };
+ }
+
static RaftGroupId toRaftGroupId(RaftGroupIdProto proto) {
return RaftGroupId.valueOf(proto.getId());
}
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 6267c3a..3b3d8cc 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -19,10 +19,15 @@ package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.proto.RaftProtos.RoutingTableProto;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedRunnable;
import org.junit.After;
@@ -35,7 +40,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
@@ -65,6 +77,20 @@ public abstract class BaseTest {
}
}
+ public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
+ RoutingTable.Builder builder = RoutingTable.newBuilder();
+ RaftPeer previous = primary;
+ for (RaftPeer peer : peers) {
+ if (peer.equals(primary)) {
+ continue;
+ }
+ builder.addSuccessor(previous.getId(), peer.getId());
+ previous = peer;
+ }
+
+ return builder.build();
+ }
+
@After
public void assertNoFailures() {
final Throwable e = firstException.get();
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index 598dc0c..59a83c5 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -67,10 +67,10 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
final RaftGroup raftGroup = cluster.getGroup();
final Collection<RaftPeer> peers = raftGroup.getPeers();
Assert.assertEquals(NUM_PEERS, peers.size());
- RaftPeer raftPeer = peers.iterator().next();
+ RaftPeer primary = peers.iterator().next();
final CheckedSupplier<FileStoreClient, IOException> newClient =
- () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+ () -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient);
testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient);
@@ -88,10 +88,10 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
final RaftGroup raftGroup = cluster.getGroup();
final Collection<RaftPeer> peers = raftGroup.getPeers();
Assert.assertEquals(NUM_PEERS, peers.size());
- RaftPeer raftPeer = peers.iterator().next();
+ RaftPeer primary = peers.iterator().next();
final CheckedSupplier<FileStoreClient, IOException> newClient =
- () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+ () -> new FileStoreClient(cluster.getGroup(), getProperties(), primary);
testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient);
testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient);
@@ -133,8 +133,4 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
future.get();
}
}
-
- static class StreamWriter {
-
- }
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 991ce82..530a618 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -134,7 +134,7 @@ class FileStoreWriter implements Closeable {
return this;
}
- public FileStoreWriter streamWriteAndVerify() throws IOException {
+ public FileStoreWriter streamWriteAndVerify() {
final int size = fileSize.getSizeInt();
final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName, size);
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 121c69e..2bde5f0 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -34,8 +34,11 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServer.Division;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -56,6 +59,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -82,11 +86,6 @@ public class DataStreamManagement {
return composeAsync(writeFuture, executor,
n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, options, stream), executor));
}
-
- CompletableFuture<Long> close(Executor executor) {
- return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(DataStreamManagement::close, executor));
- }
}
static class RemoteStream {
@@ -99,26 +98,27 @@ public class DataStreamManagement {
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
return out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions());
}
-
- CompletableFuture<DataStreamReply> close() {
- return out.closeAsync();
- }
}
static class StreamInfo {
private final RaftClientRequest request;
private final boolean primary;
private final LocalStream local;
- private final List<RemoteStream> remotes;
+ private final Set<RemoteStream> remotes;
+ private final RaftServer server;
private final AtomicReference<CompletableFuture<Void>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
- StreamInfo(RaftClientRequest request, boolean primary,
- CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
+ StreamInfo(RaftClientRequest request, boolean primary, CompletableFuture<DataStream> stream, RaftServer server,
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams)
+ throws IOException {
this.request = request;
this.primary = primary;
this.local = new LocalStream(stream);
- this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toList());
+ this.server = server;
+ final Set<RaftPeer> successors = getSuccessors(server.getId());
+ final Set<DataStreamOutputRpc> outs = getStreams.apply(request, successors);
+ this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toSet());
}
AtomicReference<CompletableFuture<Void>> getPrevious() {
@@ -145,6 +145,26 @@ public class DataStreamManagement {
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + request;
}
+
+ private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
+ final RaftGroupId groupId = request.getRaftGroupId();
+ final RaftConfiguration conf = server.getDivision(groupId).getRaftConf();
+ final RoutingTable routingTable = request.getRoutingTable();
+
+ if (routingTable != null) {
+ return routingTable.getSuccessors(peerId).stream().map(conf::getPeer).collect(Collectors.toSet());
+ }
+
+ if (isPrimary()) {
+ // Default start topology
+ // get the other peers from the current configuration
+ return conf.getCurrentPeers().stream()
+ .filter(p -> !p.getId().equals(server.getId()))
+ .collect(Collectors.toSet());
+ }
+
+ return Collections.emptySet();
+ }
}
static class StreamMap {
@@ -232,23 +252,12 @@ public class DataStreamManagement {
}
private StreamInfo newStreamInfo(ByteBuf buf,
- CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final boolean isPrimary = server.getId().equals(request.getServerId());
- final List<DataStreamOutputRpc> outs;
- if (isPrimary) {
- final RaftGroupId groupId = request.getRaftGroupId();
- // get the other peers from the current configuration
- final List<RaftPeer> others = server.getDivision(groupId).getRaftConf().getCurrentPeers().stream()
- .filter(p -> !p.getId().equals(server.getId()))
- .collect(Collectors.toList());
- outs = getStreams.apply(request, others);
- } else {
- outs = Collections.emptyList();
- }
- return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), outs);
+ return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), server, getStreams);
} catch (Throwable e) {
throw new CompletionException(e);
}
@@ -362,7 +371,7 @@ public class DataStreamManagement {
}
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 82811b7..a653d9b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -54,10 +54,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -77,8 +78,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
map.addRaftPeers(newPeers);
}
- List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers) throws IOException {
- final List<DataStreamOutputRpc> outs = new ArrayList<>();
+ Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
+ final Set<DataStreamOutputRpc> outs = new HashSet<>();
try {
getDataStreamOutput(request, peers, outs);
} catch (IOException e) {
@@ -88,7 +89,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return outs;
}
- private void getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers, List<DataStreamOutputRpc> outs)
+ private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputRpc> outs)
throws IOException {
for (RaftPeer peer : peers) {
try {
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index f380581..fdb186e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -28,6 +28,10 @@ message RaftPeerProto {
string dataStreamAddress = 4; // address of the data stream server
}
+message RaftPeerIdProto {
+ bytes id = 1; // id of the peer
+}
+
message RaftGroupIdProto {
bytes id = 1;
}
@@ -280,6 +284,15 @@ message WatchRequestTypeProto {
ReplicationLevel replication = 2;
}
+message RouteProto {
+ RaftPeerIdProto peerId = 1;
+ repeated RaftPeerIdProto successors = 2;
+}
+
+message RoutingTableProto {
+ repeated RouteProto routes = 1;
+}
+
// normal client request
message RaftClientRequestProto {
RaftRpcRequestProto rpcRequest = 1;
@@ -294,6 +307,8 @@ message RaftClientRequestProto {
DataStreamRequestTypeProto dataStream = 8;
ForwardRequestTypeProto forward = 9;
}
+
+ RoutingTableProto routingTable = 10;
}
message DataStreamPacketHeaderProto {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 254732d..bd638a2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -690,9 +690,6 @@ public abstract class MiniRaftCluster implements Closeable {
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, RetryPolicy retryPolicy, RaftPeer primaryServer) {
- if (primaryServer == null) {
- primaryServer = CollectionUtils.random(group.getPeers());
- }
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leaderId)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 869da26..fc61148 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -37,7 +37,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -147,7 +146,8 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
try(RaftClient client = cluster.createClient(primaryServer)) {
ClientId primaryClientId = getPrimaryClientId(cluster, primaryServer);
for (int i = 0; i < numStreams; i++) {
- final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
+ final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+ .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer));
futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
servers, leader, out, bufferSize, bufferNum, primaryClientId, cluster, stepDownLeader).join(), executor));
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 579a965..90d2690 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -195,6 +195,7 @@ abstract class DataStreamBaseTest extends BaseTest {
protected RaftProperties properties;
private List<Server> servers;
+ private List<RaftPeer> peers;
private RaftGroup raftGroup;
Server getPrimaryServer() {
@@ -373,6 +374,7 @@ abstract class DataStreamBaseTest extends BaseTest {
void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
raftGroup = RaftGroup.valueOf(groupId, peers);
+ this.peers = peers;
servers = new ArrayList<>(peers.size());
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
@@ -420,7 +422,8 @@ abstract class DataStreamBaseTest extends BaseTest {
Exception expectedException, Exception headerException)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream(clientId)) {
- final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
+ final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+ .stream(null, getRoutingTable(peers, getPrimaryServer().getPeer()));
if (headerException != null) {
final DataStreamReply headerReply = out.getHeaderFuture().join();
Assert.assertFalse(headerReply.isSuccess());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index d9b7f5e..c9625a6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
@@ -29,6 +30,7 @@ import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;
@@ -72,8 +74,10 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
final RaftClientRequest request;
final CompletableFuture<RaftClientReply> reply;
- try (RaftClient client = cluster.createClient()) {
- try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+ final RaftPeer primaryServer = CollectionUtils.random(cluster.getGroup().getPeers());
+ try (RaftClient client = cluster.createClient(primaryServer)) {
+ try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+ .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer))) {
request = out.getHeader();
reply = out.getRaftClientReplyFuture();
@@ -90,8 +94,10 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
CheckedConsumer<DataStreamOutputImpl, Exception> testCase) throws Exception {
final RaftClientRequest request;
final CompletableFuture<RaftClientReply> reply;
- try (RaftClient client = cluster.createClient()) {
- try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+ final RaftPeer primaryServer = CollectionUtils.random(cluster.getGroup().getPeers());
+ try (RaftClient client = cluster.createClient(primaryServer)) {
+ try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
+ .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer))) {
request = out.getHeader();
reply = out.getRaftClientReplyFuture();
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
similarity index 94%
copy from ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
index 13121c3..e4e9fef 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.datastream;
-public class TestNettyDataStreamWithGrpcCluster
+public class TestNettyDataStreamChainTopologyWithGrpcCluster
extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
similarity index 59%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
index 13121c3..cd6bbc7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
@@ -17,7 +17,25 @@
*/
package org.apache.ratis.datastream;
-public class TestNettyDataStreamWithGrpcCluster
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestNettyDataStreamStarTopologyWithGrpcCluster
extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
+
+ @Override
+ public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
+ RoutingTable.Builder builder = RoutingTable.newBuilder();
+ final List<RaftPeerId> others = peers.stream()
+ .filter(p -> !p.getId().equals(primary.getId())).map(v -> v.getId())
+ .collect(Collectors.toList());
+ builder.addSuccessors(primary.getId(), others);
+ return builder.build();
+ }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 10803fa..9b4e36b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -92,6 +92,7 @@ public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
when(raftServer.getProperties()).thenReturn(properties);
when(raftServer.getId()).thenReturn(peerId);
+ when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
if (getStateMachineException == null) {
MyDivision myDivision = new MyDivision(raftServer);
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);