You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/12 12:20:15 UTC
[incubator-ratis] branch master updated: RATIS-1151. Move the
DateStream requests handling code from NettyServerStreamRpc to a new class.
(#273)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 32c8653 RATIS-1151. Move the DateStream requests handling code from NettyServerStreamRpc to a new class. (#273)
32c8653 is described below
commit 32c8653be1d743d3fb41a6037c23ecb5214113c6
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 12 20:20:06 2020 +0800
RATIS-1151. Move the DateStream requests handling code from NettyServerStreamRpc to a new class. (#273)
---
...verStreamRpc.java => DataStreamManagement.java} | 227 ++----------
.../ratis/netty/server/NettyServerStreamRpc.java | 384 +--------------------
2 files changed, 49 insertions(+), 562 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
similarity index 65%
copy from ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
copy to ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index d7d3f7c..3aaad05 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,115 +18,48 @@
package org.apache.ratis.netty.server;
-import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.io.CloseAsync;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.*;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
-import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
-import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
-import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public class NettyServerStreamRpc implements DataStreamServerRpc {
- public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
-
- /**
- * Proxies to other peers.
- *
- * Invariant: all the {@link #peers} must exist in the {@link #map}.
- */
- static class Proxies {
- private final Set<RaftPeer> peers = new CopyOnWriteArraySet<>();
- private final PeerProxyMap<DataStreamClient> map;
-
- Proxies(PeerProxyMap<DataStreamClient> map) {
- this.map = map;
- }
-
- void addPeers(Collection<RaftPeer> newPeers) {
- // add to the map first in order to preserve the invariant.
- map.addRaftPeers(newPeers);
- // must use atomic addAll
- peers.addAll(newPeers);
- }
-
- List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request) throws IOException {
- final List<DataStreamOutputRpc> outs = new ArrayList<>();
- try {
- getDataStreamOutput(outs, request);
- } catch (IOException e) {
- outs.forEach(CloseAsync::closeAsync);
- throw e;
- }
- return outs;
- }
-
- private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftClientRequest request)
- throws IOException {
- for (RaftPeer peer : peers) {
- try {
- outs.add((DataStreamOutputRpc) map.getProxy(peer.getId()).stream(request));
- } catch (IOException e) {
- throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
- }
- }
- }
-
- void close() {
- map.close();
- }
- }
+public class DataStreamManagement {
+ public static final Logger LOG = LoggerFactory.getLogger(DataStreamManagement.class);
static class LocalStream {
private final CompletableFuture<DataStream> streamFuture;
@@ -144,7 +77,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
CompletableFuture<Long> close(Executor executor) {
return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(NettyServerStreamRpc::close, executor));
+ n -> streamFuture.thenApplyAsync(DataStreamManagement::close, executor));
}
}
@@ -159,8 +92,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return out.writeAsync(request.slice().nioBuffer());
}
- CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- Executor executor) {
+ CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request,
+ ChannelHandlerContext ctx, Executor executor) {
return out.startTransactionAsync().thenApplyAsync(reply -> {
if (reply.isSuccess()) {
final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
@@ -267,54 +200,28 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
private final RaftServer server;
private final String name;
- private final EventLoopGroup bossGroup = new NioEventLoopGroup();
- private final EventLoopGroup workerGroup = new NioEventLoopGroup();
- private final ChannelFuture channelFuture;
private final StreamMap streams = new StreamMap();
- private final Proxies proxies;
-
private final Executor executor;
- public NettyServerStreamRpc(RaftServer server) {
+ DataStreamManagement(RaftServer server) {
this.server = server;
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
final RaftProperties properties = server.getProperties();
- final int port = NettyConfigKeys.DataStream.port(properties);
- this.channelFuture = new ServerBootstrap()
- .group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(getInitializer())
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .bind(port);
- this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
this.executor = Executors.newFixedThreadPool(
- RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
- }
-
- static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
- return DataStreamClient.newBuilder()
- .setClientId(ClientId.randomId())
- .setDataStreamServer(peer)
- .setProperties(properties)
- .build();
+ RaftServerConfigKeys.DataStream.asyncThreadPoolSize(properties));
}
- @Override
- public void addRaftPeers(Collection<RaftPeer> newPeers) {
- proxies.addPeers(newPeers);
- }
-
- private StreamInfo newStreamInfo(ByteBuf buf) {
+ private StreamInfo newStreamInfo(ByteBuf buf,
+ CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
final boolean isPrimary = server.getId().equals(request.getServerId());
return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
- isPrimary? proxies.getDataStreamOutput(request): Collections.emptyList());
+ isPrimary? getDataStreamOutput.apply(request): Collections.emptyList());
} catch (Throwable e) {
throw new CompletionException(e);
}
@@ -349,7 +256,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
}
- private void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
+ static void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
request.getStreamId(), request.getStreamOffset(), buffer, -1, false, request.getType());
ctx.writeAndFlush(reply);
@@ -362,22 +269,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
ctx.writeAndFlush(reply);
}
- private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
+ static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
- if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
- sendReplyNotSuccess(request, null, ctx);
- } else {
- sendReplySuccess(request, null, bytesWritten, ctx);
- }
- }
-
- private ChannelInboundHandler newChannelInboundHandlerAdapter(){
- return new ChannelInboundHandlerAdapter(){
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- read(ctx, (DataStreamRequestByteBuf)msg);
- }
- };
+ if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
+ sendReplyNotSuccess(request, null, ctx);
+ } else {
+ sendReplySuccess(request, null, bytesWritten, ctx);
+ }
}
private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
@@ -390,7 +288,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
} else if (request.getType() == Type.STREAM_CLOSE) {
// if this server is not the leader, forward start transition to the other peers
// there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
- forwardStartTransaction(info, request, ctx, reply);
+ forwardStartTransaction(info, request, reply, ctx, executor);
} else if (request.getType() == Type.START_TRANSACTION){
ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
sendReplyNotSuccess(request, buffer, ctx);
@@ -404,13 +302,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
}
- private void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
- final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx, RaftClientReply localReply) {
+ static void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
+ DataStreamRequestByteBuf request, RaftClientReply localReply, ChannelHandlerContext ctx) {
// get replies from the results, ignored exceptional replies
final Stream<RaftClientReply> remoteReplies = results.stream()
.filter(r -> !r.isCompletedExceptionally())
.map(CompletableFuture::join)
- .map(this::getRaftClientReply);
+ .map(DataStreamManagement::getRaftClientReply);
// choose the leader's reply if there is any. Otherwise, use the local reply
final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
@@ -422,9 +320,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
sendReplyNotSuccess(request, buffer, ctx);
}
- private void forwardStartTransaction(
- final StreamInfo info, final DataStreamRequestByteBuf request,
- final ChannelHandlerContext ctx, RaftClientReply localReply) {
+ static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
+ ChannelHandlerContext ctx, Executor executor) {
final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
out -> out.startTransaction(request, ctx, executor));
@@ -435,24 +332,25 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
}
- sendLeaderFailedReply(results, request, ctx, localReply);
+ sendLeaderFailedReply(results, request, localReply, ctx);
});
}
- private RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
+ static RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
try {
return ClientProtoUtils.toRaftClientReply(
RaftClientReplyProto.parseFrom(((DataStreamReplyByteBuffer) dataStreamReply).slice()));
} catch (InvalidProtocolBufferException e) {
- throw new IllegalStateException(this + ": Failed to decode RaftClientReply");
+ throw new IllegalStateException("Failed to decode RaftClientReply");
}
} else {
- throw new IllegalStateException(this + ": Unexpected reply type");
+ throw new IllegalStateException("Unexpected reply type");
}
}
- private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
+ void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
+ CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
@@ -469,7 +367,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites;
if (request.getType() == Type.STREAM_HEADER) {
- info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
+ info = streams.computeIfAbsent(key, id -> newStreamInfo(buf, getDataStreamOutput));
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
@@ -505,8 +403,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}, executor));
}
- private boolean checkSuccessRemoteWrite(
- List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
+ static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
final DataStreamReply reply = replyFuture.join();
if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
@@ -516,60 +413,6 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return true;
}
- private ChannelInitializer<SocketChannel> getInitializer(){
- return new ChannelInitializer<SocketChannel>(){
- @Override
- public void initChannel(SocketChannel ch) {
- ChannelPipeline p = ch.pipeline();
- p.addLast(newDecoder());
- p.addLast(newEncoder());
- p.addLast(newChannelInboundHandlerAdapter());
- }
- };
- }
-
- ByteToMessageDecoder newDecoder() {
- return new ByteToMessageDecoder() {
- {
- this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
- }
-
- @Override
- protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
- Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamRequestByteBuf(buf)).ifPresent(out::add);
- }
- };
- }
-
- MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
- return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
- @Override
- protected void encode(ChannelHandlerContext context, DataStreamReplyByteBuffer reply, List<Object> out) {
- NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add, context.alloc());
- }
- };
- }
-
- @Override
- public void start() {
- channelFuture.syncUninterruptibly();
- }
-
- @Override
- public void close() {
- try {
- channelFuture.channel().close().sync();
- bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
- workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
- bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
- workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.error(this + ": Interrupted close()", e);
- }
-
- proxies.close();
- }
-
@Override
public String toString() {
return name;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index d7d3f7c..125160d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,29 +20,26 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
-import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.StateMachine.DataStream;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.*;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -56,27 +53,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -128,160 +111,21 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
}
- static class LocalStream {
- private final CompletableFuture<DataStream> streamFuture;
- private final AtomicReference<CompletableFuture<Long>> writeFuture;
-
- LocalStream(CompletableFuture<DataStream> streamFuture) {
- this.streamFuture = streamFuture;
- this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 0L));
- }
-
- CompletableFuture<Long> write(ByteBuf buf, Executor executor) {
- return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, stream), executor));
- }
-
- CompletableFuture<Long> close(Executor executor) {
- return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(NettyServerStreamRpc::close, executor));
- }
- }
-
- static class RemoteStream {
- private final DataStreamOutputRpc out;
-
- RemoteStream(DataStreamOutputRpc out) {
- this.out = out;
- }
-
- CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
- return out.writeAsync(request.slice().nioBuffer());
- }
-
- CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- Executor executor) {
- return out.startTransactionAsync().thenApplyAsync(reply -> {
- if (reply.isSuccess()) {
- final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
- ((DataStreamReplyByteBuffer)reply).slice(): null;
- sendReplySuccess(request, buffer, -1, ctx);
- }
- return reply;
- }, executor);
- }
-
- CompletableFuture<DataStreamReply> close() {
- return out.closeAsync();
- }
- }
-
- static class StreamInfo {
- private final RaftClientRequest request;
- private final boolean primary;
- private final LocalStream local;
- private final List<RemoteStream> remotes;
- private final AtomicReference<CompletableFuture<Void>> previous
- = new AtomicReference<>(CompletableFuture.completedFuture(null));
-
- StreamInfo(RaftClientRequest request, boolean primary,
- CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
- this.request = request;
- this.primary = primary;
- this.local = new LocalStream(stream);
- this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toList());
- }
-
- AtomicReference<CompletableFuture<Void>> getPrevious() {
- return previous;
- }
-
- RaftClientRequest getRequest() {
- return request;
- }
-
- boolean isPrimary() {
- return primary;
- }
-
- LocalStream getLocal() {
- return local;
- }
-
- <T> List<T> applyToRemotes(Function<RemoteStream, T> function) {
- return remotes.isEmpty()?Collections.emptyList(): remotes.stream().map(function).collect(Collectors.toList());
- }
-
- @Override
- public String toString() {
- return JavaUtils.getClassSimpleName(getClass()) + ":" + request;
- }
- }
-
- static class StreamMap {
- static class Key {
- private final ChannelId channelId;
- private final long streamId;
-
- Key(ChannelId channelId, long streamId) {
- this.channelId = channelId;
- this.streamId = streamId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- } else if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final Key that = (Key) obj;
- return this.streamId == that.streamId && Objects.equals(this.channelId, that.channelId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(channelId, streamId);
- }
-
- @Override
- public String toString() {
- return channelId + "-" + streamId;
- }
- }
-
- private final ConcurrentMap<Key, StreamInfo> map = new ConcurrentHashMap<>();
-
- StreamInfo computeIfAbsent(Key key, Function<Key, StreamInfo> function) {
- final StreamInfo info = map.computeIfAbsent(key, function);
- LOG.debug("computeIfAbsent({}) returns {}", key, info);
- return info;
- }
-
- StreamInfo get(Key key) {
- final StreamInfo info = map.get(key);
- LOG.debug("get({}) returns {}", key, info);
- return info;
- }
- }
-
- private final RaftServer server;
private final String name;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ChannelFuture channelFuture;
- private final StreamMap streams = new StreamMap();
+ private final DataStreamManagement requests;
private final Proxies proxies;
- private final Executor executor;
-
public NettyServerStreamRpc(RaftServer server) {
- this.server = server;
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
+ this.requests = new DataStreamManagement(server);
final RaftProperties properties = server.getProperties();
final int port = NettyConfigKeys.DataStream.port(properties);
+ this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@@ -289,9 +133,6 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
.childHandler(getInitializer())
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(port);
- this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
- this.executor = Executors.newFixedThreadPool(
- RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
}
static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
@@ -307,213 +148,16 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
proxies.addPeers(newPeers);
}
- private StreamInfo newStreamInfo(ByteBuf buf) {
- try {
- final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
- RaftClientRequestProto.parseFrom(buf.nioBuffer()));
- final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
- final boolean isPrimary = server.getId().equals(request.getServerId());
- return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
- isPrimary? proxies.getDataStreamOutput(request): Collections.emptyList());
- } catch (Throwable e) {
- throw new CompletionException(e);
- }
- }
-
- static <T> CompletableFuture<T> composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor,
- Function<T, CompletableFuture<T>> function) {
- final CompletableFuture<T> composed = future.get().thenComposeAsync(function, executor);
- future.set(composed);
- return composed;
- }
-
- static long writeTo(ByteBuf buf, DataStream stream) {
- final WritableByteChannel channel = stream.getWritableByteChannel();
- long byteWritten = 0;
- for (ByteBuffer buffer : buf.nioBuffers()) {
- try {
- byteWritten += channel.write(buffer);
- } catch (Throwable t) {
- throw new CompletionException(t);
- }
- }
- return byteWritten;
- }
-
- static long close(DataStream stream) {
- try {
- stream.getWritableByteChannel().close();
- return 0L;
- } catch (IOException e) {
- throw new CompletionException("Failed to close " + stream, e);
- }
- }
-
- private void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
- final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), buffer, -1, false, request.getType());
- ctx.writeAndFlush(reply);
- }
-
- static void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
- ChannelHandlerContext ctx) {
- final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), buffer, bytesWritten, true, request.getType());
- ctx.writeAndFlush(reply);
- }
-
- private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
- DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
- if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
- sendReplyNotSuccess(request, null, ctx);
- } else {
- sendReplySuccess(request, null, bytesWritten, ctx);
- }
- }
private ChannelInboundHandler newChannelInboundHandlerAdapter(){
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- read(ctx, (DataStreamRequestByteBuf)msg);
- }
- };
- }
-
- private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
- ChannelHandlerContext ctx) {
- try {
- return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
- if (reply.isSuccess()) {
- ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
- sendReplySuccess(request, buffer, -1, ctx);
- } else if (request.getType() == Type.STREAM_CLOSE) {
- // if this server is not the leader, forward start transition to the other peers
- // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
- forwardStartTransaction(info, request, ctx, reply);
- } else if (request.getType() == Type.START_TRANSACTION){
- ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
- sendReplyNotSuccess(request, buffer, ctx);
- } else {
- throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
+ if (msg instanceof DataStreamRequestByteBuf) {
+ requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
}
- }, executor);
- } catch (IOException e) {
- sendReplyNotSuccess(request, null, ctx);
- return CompletableFuture.completedFuture(null);
- }
- }
-
- private void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
- final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx, RaftClientReply localReply) {
- // get replies from the results, ignored exceptional replies
- final Stream<RaftClientReply> remoteReplies = results.stream()
- .filter(r -> !r.isCompletedExceptionally())
- .map(CompletableFuture::join)
- .map(this::getRaftClientReply);
-
- // choose the leader's reply if there is any. Otherwise, use the local reply
- final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
- .filter(reply -> reply.getNotLeaderException() == null)
- .findAny().orElse(localReply);
-
- // send reply
- final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
- sendReplyNotSuccess(request, buffer, ctx);
- }
-
- private void forwardStartTransaction(
- final StreamInfo info, final DataStreamRequestByteBuf request,
- final ChannelHandlerContext ctx, RaftClientReply localReply) {
- final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
- out -> out.startTransaction(request, ctx, executor));
-
- JavaUtils.allOf(results).thenAccept(v -> {
- for (CompletableFuture<DataStreamReply> result : results) {
- if (result.join().isSuccess()) {
- return;
- }
- }
-
- sendLeaderFailedReply(results, request, ctx, localReply);
- });
- }
-
- private RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
- if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
- try {
- return ClientProtoUtils.toRaftClientReply(
- RaftClientReplyProto.parseFrom(((DataStreamReplyByteBuffer) dataStreamReply).slice()));
- } catch (InvalidProtocolBufferException e) {
- throw new IllegalStateException(this + ": Failed to decode RaftClientReply");
}
- } else {
- throw new IllegalStateException(this + ": Unexpected reply type");
- }
- }
-
- private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
- LOG.debug("{}: read {}", this, request);
- final ByteBuf buf = request.slice();
- final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-
- if (request.getType() == Type.START_TRANSACTION) {
- // for peers to start transaction
- final StreamInfo info = streams.get(key);
- composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
- .thenAccept(v -> buf.release());
- return;
- }
-
- final StreamInfo info;
- final CompletableFuture<Long> localWrite;
- final List<CompletableFuture<DataStreamReply>> remoteWrites;
- if (request.getType() == Type.STREAM_HEADER) {
- info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
- localWrite = CompletableFuture.completedFuture(0L);
- remoteWrites = Collections.emptyList();
- } else if (request.getType() == Type.STREAM_DATA) {
- info = streams.get(key);
- localWrite = info.getLocal().write(buf, executor);
- remoteWrites = info.applyToRemotes(out -> out.write(request));
- } else if (request.getType() == Type.STREAM_CLOSE) {
- info = streams.get(key);
- localWrite = info.getLocal().close(executor);
- remoteWrites = info.isPrimary()? info.applyToRemotes(RemoteStream::close): Collections.emptyList();
- } else {
- throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
- }
-
- composeAsync(info.getPrevious(), executor, n -> JavaUtils.allOf(remoteWrites)
- .thenCombineAsync(localWrite, (v, bytesWritten) -> {
- if (request.getType() == Type.STREAM_HEADER
- || request.getType() == Type.STREAM_DATA) {
- sendReply(remoteWrites, request, bytesWritten, ctx);
- } else if (request.getType() == Type.STREAM_CLOSE) {
- if (info.isPrimary()) {
- // after all server close stream, primary server start transaction
- // TODO(runzhiwang): send start transaction to leader directly
- startTransaction(info, request, ctx);
- } else {
- sendReply(remoteWrites, request, bytesWritten, ctx);
- }
- } else {
- throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
- }
- buf.release();
- return null;
- }, executor));
- }
-
- private boolean checkSuccessRemoteWrite(
- List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
- for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
- final DataStreamReply reply = replyFuture.join();
- if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
- return false;
- }
- }
- return true;
+ };
}
private ChannelInitializer<SocketChannel> getInitializer(){