You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/12 12:20:15 UTC

[incubator-ratis] branch master updated: RATIS-1151. Move the DateStream requests handling code from NettyServerStreamRpc to a new class. (#273)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c8653  RATIS-1151. Move the DateStream requests handling code from NettyServerStreamRpc to a new class. (#273)
32c8653 is described below

commit 32c8653be1d743d3fb41a6037c23ecb5214113c6
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 12 20:20:06 2020 +0800

    RATIS-1151. Move the DateStream requests handling code from NettyServerStreamRpc to a new class. (#273)
---
 ...verStreamRpc.java => DataStreamManagement.java} | 227 ++----------
 .../ratis/netty/server/NettyServerStreamRpc.java   | 384 +--------------------
 2 files changed, 49 insertions(+), 562 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
similarity index 65%
copy from ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
copy to ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index d7d3f7c..3aaad05 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,115 +18,48 @@
 
 package org.apache.ratis.netty.server;
 
-import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.io.CloseAsync;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.*;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
-import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
-import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
-import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-public class NettyServerStreamRpc implements DataStreamServerRpc {
-  public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
-
-  /**
-   * Proxies to other peers.
-   *
-   * Invariant: all the {@link #peers} must exist in the {@link #map}.
-   */
-  static class Proxies {
-    private final Set<RaftPeer> peers = new CopyOnWriteArraySet<>();
-    private final PeerProxyMap<DataStreamClient> map;
-
-    Proxies(PeerProxyMap<DataStreamClient> map) {
-      this.map = map;
-    }
-
-    void addPeers(Collection<RaftPeer> newPeers) {
-      // add to the map first in order to preserve the invariant.
-      map.addRaftPeers(newPeers);
-      // must use atomic addAll
-      peers.addAll(newPeers);
-    }
-
-    List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request) throws IOException {
-      final List<DataStreamOutputRpc> outs = new ArrayList<>();
-      try {
-        getDataStreamOutput(outs, request);
-      } catch (IOException e) {
-        outs.forEach(CloseAsync::closeAsync);
-        throw e;
-      }
-      return outs;
-    }
-
-    private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftClientRequest request)
-        throws IOException {
-      for (RaftPeer peer : peers) {
-        try {
-          outs.add((DataStreamOutputRpc) map.getProxy(peer.getId()).stream(request));
-        } catch (IOException e) {
-          throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
-        }
-      }
-    }
-
-    void close() {
-      map.close();
-    }
-  }
+public class DataStreamManagement {
+  public static final Logger LOG = LoggerFactory.getLogger(DataStreamManagement.class);
 
   static class LocalStream {
     private final CompletableFuture<DataStream> streamFuture;
@@ -144,7 +77,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
 
     CompletableFuture<Long> close(Executor executor) {
       return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(NettyServerStreamRpc::close, executor));
+          n -> streamFuture.thenApplyAsync(DataStreamManagement::close, executor));
     }
   }
 
@@ -159,8 +92,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       return out.writeAsync(request.slice().nioBuffer());
     }
 
-    CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-        Executor executor) {
+    CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request,
+        ChannelHandlerContext ctx, Executor executor) {
       return out.startTransactionAsync().thenApplyAsync(reply -> {
         if (reply.isSuccess()) {
           final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
@@ -267,54 +200,28 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
 
   private final RaftServer server;
   private final String name;
-  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
-  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
-  private final ChannelFuture channelFuture;
 
   private final StreamMap streams = new StreamMap();
-  private final Proxies proxies;
-
   private final Executor executor;
 
-  public NettyServerStreamRpc(RaftServer server) {
+  DataStreamManagement(RaftServer server) {
     this.server = server;
     this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
 
     final RaftProperties properties = server.getProperties();
-    final int port = NettyConfigKeys.DataStream.port(properties);
-    this.channelFuture = new ServerBootstrap()
-        .group(bossGroup, workerGroup)
-        .channel(NioServerSocketChannel.class)
-        .handler(new LoggingHandler(LogLevel.INFO))
-        .childHandler(getInitializer())
-        .childOption(ChannelOption.SO_KEEPALIVE, true)
-        .bind(port);
-    this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
     this.executor = Executors.newFixedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
-  }
-
-  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
-    return DataStreamClient.newBuilder()
-        .setClientId(ClientId.randomId())
-        .setDataStreamServer(peer)
-        .setProperties(properties)
-        .build();
+        RaftServerConfigKeys.DataStream.asyncThreadPoolSize(properties));
   }
 
-  @Override
-  public void addRaftPeers(Collection<RaftPeer> newPeers) {
-    proxies.addPeers(newPeers);
-  }
-
-  private StreamInfo newStreamInfo(ByteBuf buf) {
+  private StreamInfo newStreamInfo(ByteBuf buf,
+      CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
       final boolean isPrimary = server.getId().equals(request.getServerId());
       return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
-          isPrimary? proxies.getDataStreamOutput(request): Collections.emptyList());
+          isPrimary? getDataStreamOutput.apply(request): Collections.emptyList());
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
@@ -349,7 +256,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     }
   }
 
-  private void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
+  static void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
         request.getStreamId(), request.getStreamOffset(), buffer, -1, false, request.getType());
     ctx.writeAndFlush(reply);
@@ -362,22 +269,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     ctx.writeAndFlush(reply);
   }
 
-  private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
+  static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
       DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
-      if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
-        sendReplyNotSuccess(request, null, ctx);
-      } else {
-        sendReplySuccess(request, null, bytesWritten, ctx);
-      }
-  }
-
-  private ChannelInboundHandler newChannelInboundHandlerAdapter(){
-    return new ChannelInboundHandlerAdapter(){
-      @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        read(ctx, (DataStreamRequestByteBuf)msg);
-      }
-    };
+    if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      sendReplySuccess(request, null, bytesWritten, ctx);
+    }
   }
 
   private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
@@ -390,7 +288,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx, reply);
+          forwardStartTransaction(info, request, reply, ctx, executor);
         } else if (request.getType() == Type.START_TRANSACTION){
           ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
           sendReplyNotSuccess(request, buffer, ctx);
@@ -404,13 +302,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     }
   }
 
-  private void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
-      final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx, RaftClientReply localReply) {
+  static void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
+      DataStreamRequestByteBuf request, RaftClientReply localReply, ChannelHandlerContext ctx) {
     // get replies from the results, ignored exceptional replies
     final Stream<RaftClientReply> remoteReplies = results.stream()
         .filter(r -> !r.isCompletedExceptionally())
         .map(CompletableFuture::join)
-        .map(this::getRaftClientReply);
+        .map(DataStreamManagement::getRaftClientReply);
 
     // choose the leader's reply if there is any.  Otherwise, use the local reply
     final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
@@ -422,9 +320,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     sendReplyNotSuccess(request, buffer, ctx);
   }
 
-  private void forwardStartTransaction(
-      final StreamInfo info, final DataStreamRequestByteBuf request,
-      final ChannelHandlerContext ctx, RaftClientReply localReply) {
+  static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
+      ChannelHandlerContext ctx, Executor executor) {
     final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
         out -> out.startTransaction(request, ctx, executor));
 
@@ -435,24 +332,25 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         }
       }
 
-      sendLeaderFailedReply(results, request, ctx, localReply);
+      sendLeaderFailedReply(results, request, localReply, ctx);
     });
   }
 
-  private RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
+  static RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
     if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
       try {
         return ClientProtoUtils.toRaftClientReply(
             RaftClientReplyProto.parseFrom(((DataStreamReplyByteBuffer) dataStreamReply).slice()));
       } catch (InvalidProtocolBufferException e) {
-        throw new IllegalStateException(this + ": Failed to decode RaftClientReply");
+        throw new IllegalStateException("Failed to decode RaftClientReply");
       }
     } else {
-      throw new IllegalStateException(this + ": Unexpected reply type");
+      throw new IllegalStateException("Unexpected reply type");
     }
   }
 
-  private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
+  void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
+      CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
@@ -469,7 +367,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites;
     if (request.getType() == Type.STREAM_HEADER) {
-      info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
+      info = streams.computeIfAbsent(key, id -> newStreamInfo(buf, getDataStreamOutput));
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
@@ -505,8 +403,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         }, executor));
   }
 
-  private boolean checkSuccessRemoteWrite(
-          List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
+  static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
     for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
       final DataStreamReply reply = replyFuture.join();
       if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
@@ -516,60 +413,6 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     return true;
   }
 
-  private ChannelInitializer<SocketChannel> getInitializer(){
-    return new ChannelInitializer<SocketChannel>(){
-      @Override
-      public void initChannel(SocketChannel ch) {
-        ChannelPipeline p = ch.pipeline();
-        p.addLast(newDecoder());
-        p.addLast(newEncoder());
-        p.addLast(newChannelInboundHandlerAdapter());
-      }
-    };
-  }
-
-  ByteToMessageDecoder newDecoder() {
-    return new ByteToMessageDecoder() {
-      {
-        this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
-      }
-
-      @Override
-      protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
-        Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamRequestByteBuf(buf)).ifPresent(out::add);
-      }
-    };
-  }
-
-  MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
-    return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
-      @Override
-      protected void encode(ChannelHandlerContext context, DataStreamReplyByteBuffer reply, List<Object> out) {
-        NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add, context.alloc());
-      }
-    };
-  }
-
-  @Override
-  public void start() {
-    channelFuture.syncUninterruptibly();
-  }
-
-  @Override
-  public void close() {
-    try {
-      channelFuture.channel().close().sync();
-      bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
-      workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
-      bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
-      workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      LOG.error(this + ": Interrupted close()", e);
-    }
-
-    proxies.close();
-  }
-
   @Override
   public String toString() {
     return name;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index d7d3f7c..125160d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,29 +20,26 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamOutputRpc;
-import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.StateMachine.DataStream;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.*;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -56,27 +53,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -128,160 +111,21 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     }
   }
 
-  static class LocalStream {
-    private final CompletableFuture<DataStream> streamFuture;
-    private final AtomicReference<CompletableFuture<Long>> writeFuture;
-
-    LocalStream(CompletableFuture<DataStream> streamFuture) {
-      this.streamFuture = streamFuture;
-      this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 0L));
-    }
-
-    CompletableFuture<Long> write(ByteBuf buf, Executor executor) {
-      return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, stream), executor));
-    }
-
-    CompletableFuture<Long> close(Executor executor) {
-      return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(NettyServerStreamRpc::close, executor));
-    }
-  }
-
-  static class RemoteStream {
-    private final DataStreamOutputRpc out;
-
-    RemoteStream(DataStreamOutputRpc out) {
-      this.out = out;
-    }
-
-    CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
-      return out.writeAsync(request.slice().nioBuffer());
-    }
-
-    CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-        Executor executor) {
-      return out.startTransactionAsync().thenApplyAsync(reply -> {
-        if (reply.isSuccess()) {
-          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
-              ((DataStreamReplyByteBuffer)reply).slice(): null;
-          sendReplySuccess(request, buffer, -1, ctx);
-        }
-        return reply;
-      }, executor);
-    }
-
-    CompletableFuture<DataStreamReply> close() {
-      return out.closeAsync();
-    }
-  }
-
-  static class StreamInfo {
-    private final RaftClientRequest request;
-    private final boolean primary;
-    private final LocalStream local;
-    private final List<RemoteStream> remotes;
-    private final AtomicReference<CompletableFuture<Void>> previous
-        = new AtomicReference<>(CompletableFuture.completedFuture(null));
-
-    StreamInfo(RaftClientRequest request, boolean primary,
-        CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
-      this.request = request;
-      this.primary = primary;
-      this.local = new LocalStream(stream);
-      this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toList());
-    }
-
-    AtomicReference<CompletableFuture<Void>> getPrevious() {
-      return previous;
-    }
-
-    RaftClientRequest getRequest() {
-      return request;
-    }
-
-    boolean isPrimary() {
-      return primary;
-    }
-
-    LocalStream getLocal() {
-      return local;
-    }
-
-    <T> List<T> applyToRemotes(Function<RemoteStream, T> function) {
-      return remotes.isEmpty()?Collections.emptyList(): remotes.stream().map(function).collect(Collectors.toList());
-    }
-
-    @Override
-    public String toString() {
-      return JavaUtils.getClassSimpleName(getClass()) + ":" + request;
-    }
-  }
-
-  static class StreamMap {
-    static class Key {
-      private final ChannelId channelId;
-      private final long streamId;
-
-      Key(ChannelId channelId, long streamId) {
-        this.channelId = channelId;
-        this.streamId = streamId;
-      }
-
-      @Override
-      public boolean equals(Object obj) {
-        if (this == obj) {
-          return true;
-        } else if (obj == null || getClass() != obj.getClass()) {
-          return false;
-        }
-        final Key that = (Key) obj;
-        return this.streamId == that.streamId && Objects.equals(this.channelId, that.channelId);
-      }
-
-      @Override
-      public int hashCode() {
-        return Objects.hash(channelId, streamId);
-      }
-
-      @Override
-      public String toString() {
-        return channelId + "-" + streamId;
-      }
-    }
-
-    private final ConcurrentMap<Key, StreamInfo> map = new ConcurrentHashMap<>();
-
-    StreamInfo computeIfAbsent(Key key, Function<Key, StreamInfo> function) {
-      final StreamInfo info = map.computeIfAbsent(key, function);
-      LOG.debug("computeIfAbsent({}) returns {}", key, info);
-      return info;
-    }
-
-    StreamInfo get(Key key) {
-      final StreamInfo info = map.get(key);
-      LOG.debug("get({}) returns {}", key, info);
-      return info;
-    }
-  }
-
-  private final RaftServer server;
   private final String name;
   private final EventLoopGroup bossGroup = new NioEventLoopGroup();
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
   private final ChannelFuture channelFuture;
 
-  private final StreamMap streams = new StreamMap();
+  private final DataStreamManagement requests;
   private final Proxies proxies;
 
-  private final Executor executor;
-
   public NettyServerStreamRpc(RaftServer server) {
-    this.server = server;
     this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
+    this.requests = new DataStreamManagement(server);
 
     final RaftProperties properties = server.getProperties();
     final int port = NettyConfigKeys.DataStream.port(properties);
+    this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
     this.channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
@@ -289,9 +133,6 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         .childHandler(getInitializer())
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .bind(port);
-    this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
-    this.executor = Executors.newFixedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
   }
 
   static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
@@ -307,213 +148,16 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     proxies.addPeers(newPeers);
   }
 
-  private StreamInfo newStreamInfo(ByteBuf buf) {
-    try {
-      final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
-          RaftClientRequestProto.parseFrom(buf.nioBuffer()));
-      final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
-      final boolean isPrimary = server.getId().equals(request.getServerId());
-      return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
-          isPrimary? proxies.getDataStreamOutput(request): Collections.emptyList());
-    } catch (Throwable e) {
-      throw new CompletionException(e);
-    }
-  }
-
-  static <T> CompletableFuture<T> composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor,
-      Function<T, CompletableFuture<T>> function) {
-    final CompletableFuture<T> composed = future.get().thenComposeAsync(function, executor);
-    future.set(composed);
-    return composed;
-  }
-
-  static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
-    long byteWritten = 0;
-    for (ByteBuffer buffer : buf.nioBuffers()) {
-      try {
-        byteWritten += channel.write(buffer);
-      } catch (Throwable t) {
-        throw new CompletionException(t);
-      }
-    }
-    return byteWritten;
-  }
-
-  static long close(DataStream stream) {
-    try {
-      stream.getWritableByteChannel().close();
-      return 0L;
-    } catch (IOException e) {
-      throw new CompletionException("Failed to close " + stream, e);
-    }
-  }
-
-  private void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), buffer, -1, false, request.getType());
-    ctx.writeAndFlush(reply);
-  }
-
-  static void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
-      ChannelHandlerContext ctx) {
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), buffer, bytesWritten, true, request.getType());
-    ctx.writeAndFlush(reply);
-  }
-
-  private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
-      DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
-      if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
-        sendReplyNotSuccess(request, null, ctx);
-      } else {
-        sendReplySuccess(request, null, bytesWritten, ctx);
-      }
-  }
 
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        read(ctx, (DataStreamRequestByteBuf)msg);
-      }
-    };
-  }
-
-  private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
-      ChannelHandlerContext ctx) {
-    try {
-      return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
-        if (reply.isSuccess()) {
-          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
-          sendReplySuccess(request, buffer, -1, ctx);
-        } else if (request.getType() == Type.STREAM_CLOSE) {
-          // if this server is not the leader, forward start transition to the other peers
-          // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx, reply);
-        } else if (request.getType() == Type.START_TRANSACTION){
-          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
-          sendReplyNotSuccess(request, buffer, ctx);
-        } else {
-          throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
+        if (msg instanceof DataStreamRequestByteBuf) {
+          requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
         }
-      }, executor);
-    } catch (IOException e) {
-      sendReplyNotSuccess(request, null, ctx);
-      return CompletableFuture.completedFuture(null);
-    }
-  }
-
-  private void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
-      final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx, RaftClientReply localReply) {
-    // get replies from the results, ignored exceptional replies
-    final Stream<RaftClientReply> remoteReplies = results.stream()
-        .filter(r -> !r.isCompletedExceptionally())
-        .map(CompletableFuture::join)
-        .map(this::getRaftClientReply);
-
-    // choose the leader's reply if there is any.  Otherwise, use the local reply
-    final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
-        .filter(reply -> reply.getNotLeaderException() == null)
-        .findAny().orElse(localReply);
-
-    // send reply
-    final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
-    sendReplyNotSuccess(request, buffer, ctx);
-  }
-
-  private void forwardStartTransaction(
-      final StreamInfo info, final DataStreamRequestByteBuf request,
-      final ChannelHandlerContext ctx, RaftClientReply localReply) {
-    final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
-        out -> out.startTransaction(request, ctx, executor));
-
-    JavaUtils.allOf(results).thenAccept(v -> {
-      for (CompletableFuture<DataStreamReply> result : results) {
-        if (result.join().isSuccess()) {
-          return;
-        }
-      }
-
-      sendLeaderFailedReply(results, request, ctx, localReply);
-    });
-  }
-
-  private RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
-    if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
-      try {
-        return ClientProtoUtils.toRaftClientReply(
-            RaftClientReplyProto.parseFrom(((DataStreamReplyByteBuffer) dataStreamReply).slice()));
-      } catch (InvalidProtocolBufferException e) {
-        throw new IllegalStateException(this + ": Failed to decode RaftClientReply");
       }
-    } else {
-      throw new IllegalStateException(this + ": Unexpected reply type");
-    }
-  }
-
-  private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
-    LOG.debug("{}: read {}", this, request);
-    final ByteBuf buf = request.slice();
-    final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-
-    if (request.getType() == Type.START_TRANSACTION) {
-      // for peers to start transaction
-      final StreamInfo info = streams.get(key);
-      composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
-          .thenAccept(v -> buf.release());
-      return;
-    }
-
-    final StreamInfo info;
-    final CompletableFuture<Long> localWrite;
-    final List<CompletableFuture<DataStreamReply>> remoteWrites;
-    if (request.getType() == Type.STREAM_HEADER) {
-      info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
-      localWrite = CompletableFuture.completedFuture(0L);
-      remoteWrites = Collections.emptyList();
-    } else if (request.getType() == Type.STREAM_DATA) {
-      info = streams.get(key);
-      localWrite = info.getLocal().write(buf, executor);
-      remoteWrites = info.applyToRemotes(out -> out.write(request));
-    } else if (request.getType() == Type.STREAM_CLOSE) {
-      info = streams.get(key);
-      localWrite = info.getLocal().close(executor);
-      remoteWrites = info.isPrimary()? info.applyToRemotes(RemoteStream::close): Collections.emptyList();
-    } else {
-      throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
-    }
-
-    composeAsync(info.getPrevious(), executor, n -> JavaUtils.allOf(remoteWrites)
-        .thenCombineAsync(localWrite, (v, bytesWritten) -> {
-          if (request.getType() == Type.STREAM_HEADER
-              || request.getType() == Type.STREAM_DATA) {
-            sendReply(remoteWrites, request, bytesWritten, ctx);
-          } else if (request.getType() == Type.STREAM_CLOSE) {
-            if (info.isPrimary()) {
-              // after all server close stream, primary server start transaction
-              // TODO(runzhiwang): send start transaction to leader directly
-              startTransaction(info, request, ctx);
-            } else {
-              sendReply(remoteWrites, request, bytesWritten, ctx);
-            }
-          } else {
-            throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
-          }
-          buf.release();
-          return null;
-        }, executor));
-  }
-
-  private boolean checkSuccessRemoteWrite(
-          List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
-    for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
-      final DataStreamReply reply = replyFuture.join();
-      if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
-        return false;
-      }
-    }
-    return true;
+    };
   }
 
   private ChannelInitializer<SocketChannel> getInitializer(){