You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/06 08:03:50 UTC

[incubator-ratis] branch master updated: RATIS-1133. Primary and peer should use the same RaftClientRequest (#258)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 31381b6  RATIS-1133. Primary and peer should use the same RaftClientRequest (#258)
31381b6 is described below

commit 31381b69f87587fd34fc252ba5732b741cdbd2b8
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Nov 6 16:01:14 2020 +0800

    RATIS-1133. Primary and peer should use the same RaftClientRequest (#258)
---
 .../org/apache/ratis/client/DataStreamRpcApi.java   |  4 ++--
 .../ratis/client/impl/DataStreamClientImpl.java     | 21 +++++++++------------
 .../ratis/netty/server/NettyServerStreamRpc.java    | 15 +++++++--------
 .../apache/ratis/datastream/DataStreamBaseTest.java |  7 ++-----
 4 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java
index 409e4e1..b7b5e3e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java
@@ -20,10 +20,10 @@ package org.apache.ratis.client;
 
 import org.apache.ratis.client.api.DataStreamApi;
 import org.apache.ratis.client.api.DataStreamOutput;
-import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftClientRequest;
 
 /** An RPC interface which extends the user interface {@link DataStreamApi}. */
 public interface DataStreamRpcApi extends DataStreamApi {
   /** Create a stream for primary server to send data to peer server. */
-  DataStreamOutput stream(RaftGroupId groupId, long streamId);
+  DataStreamOutput stream(RaftClientRequest request);
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 9efe56c..bf54fb9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -63,20 +63,15 @@ public class DataStreamClientImpl implements DataStreamClient {
     this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
   }
 
-  public class DataStreamOutputImpl implements DataStreamOutputRpc {
+  public final class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
 
     private long streamOffset = 0;
 
-    public DataStreamOutputImpl(RaftGroupId groupId) {
-      this(groupId, RaftClientImpl.nextCallId());
-    }
-
-    public DataStreamOutputImpl(RaftGroupId groupId, long streamId) {
-      this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, streamId,
-          RaftClientRequest.writeRequestType());
-      this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
+    private DataStreamOutputImpl(RaftClientRequest request) {
+      this.header = request;
+      this.headerFuture = orderedStreamAsync.sendRequest(request.getCallId(), -1,
           ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
     }
 
@@ -133,12 +128,14 @@ public class DataStreamClientImpl implements DataStreamClient {
 
   @Override
   public DataStreamOutputRpc stream(RaftGroupId gid) {
-    return new DataStreamOutputImpl(gid);
+    RaftClientRequest request = new RaftClientRequest(
+        clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
+    return new DataStreamOutputImpl(request);
   }
 
   @Override
-  public DataStreamOutputRpc stream(RaftGroupId gid, long streamId) {
-    return new DataStreamOutputImpl(gid, streamId);
+  public DataStreamOutputRpc stream(RaftClientRequest request) {
+    return new DataStreamOutputImpl(request);
   }
 
   @Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index f983684..6d3143a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -31,7 +31,6 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
@@ -96,10 +95,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       peers.addAll(newPeers);
     }
 
-    List<DataStreamOutputRpc> getDataStreamOutput(RaftGroupId groupId, long streamId) throws IOException {
+    List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request) throws IOException {
       final List<DataStreamOutputRpc> outs = new ArrayList<>();
       try {
-        getDataStreamOutput(outs, groupId, streamId);
+        getDataStreamOutput(outs, request);
       } catch (IOException e) {
         outs.forEach(CloseAsync::closeAsync);
         throw e;
@@ -107,11 +106,11 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftGroupId groupId, long streamId)
+    private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftClientRequest request)
         throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add((DataStreamOutputRpc) map.getProxy(peer.getId()).stream(groupId, streamId));
+          outs.add((DataStreamOutputRpc) map.getProxy(peer.getId()).stream(request));
         } catch (IOException e) {
           throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
         }
@@ -248,13 +247,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     proxies.addPeers(newPeers);
   }
 
-  private StreamInfo newStreamInfo(ByteBuf buf, long streamId) {
+  private StreamInfo newStreamInfo(ByteBuf buf) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
       return new StreamInfo(request, stateMachine.data().stream(request),
-          proxies.getDataStreamOutput(request.getRaftGroupId(), streamId));
+          proxies.getDataStreamOutput(request));
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
@@ -365,7 +364,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
     if (request.getType() == Type.STREAM_HEADER) {
-      info = streams.computeIfAbsent(key, id -> newStreamInfo(buf, request.getStreamId()));
+      info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 668cb17..51c6bb7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -474,11 +474,8 @@ abstract class DataStreamBaseTest extends BaseTest {
     Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
     Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
-
-    final Server primary = getPrimaryServer();
-    if (server == primary) {
-      Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
-    }
+    Assert.assertEquals(writeRequest.getClientId(), header.getClientId());
+    Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
   }
 
   static ByteBuffer initBuffer(int offset, int size) {