You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/16 23:47:27 UTC

[incubator-ratis] branch master updated: RATIS-1160. DataStreamClientImpl.closeAsync() should be idempotent. (#281)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ffedbba  RATIS-1160. DataStreamClientImpl.closeAsync() should be idempotent. (#281)
ffedbba is described below

commit ffedbba1141d636749d60bf3967f576bf71ea485
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Nov 17 07:47:19 2020 +0800

    RATIS-1160. DataStreamClientImpl.closeAsync() should be idempotent. (#281)
---
 .../ratis/client/impl/DataStreamClientImpl.java    |  15 ++-
 .../apache/ratis/protocol/RaftClientMessage.java   |  11 ++-
 .../org/apache/ratis/protocol/RaftClientReply.java |  10 +-
 .../apache/ratis/protocol/RaftClientRequest.java   |  10 +-
 .../ratis/datastream/DataStreamBaseTest.java       | 102 +++++++++++++++------
 .../ratis/datastream/TestDataStreamNetty.java      |  36 ++++----
 6 files changed, 119 insertions(+), 65 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index c078218..2acf657 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -28,6 +28,9 @@ import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -57,6 +60,8 @@ public class DataStreamClientImpl implements DataStreamClient {
   public final class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
+    private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier
+        = JavaUtils.memoize(() -> send(Type.STREAM_CLOSE));
 
     private long streamOffset = 0;
 
@@ -81,6 +86,10 @@ public class DataStreamClientImpl implements DataStreamClient {
     // send to the attached dataStreamClientRpc
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
+      if (isClosed()) {
+        return JavaUtils.completeExceptionally(new AlreadyClosedException(
+            clientId + ": stream already closed, request=" + header));
+      }
       final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
       streamOffset += buf.remaining();
       return combineHeader(f);
@@ -88,7 +97,11 @@ public class DataStreamClientImpl implements DataStreamClient {
 
     @Override
     public CompletableFuture<DataStreamReply> closeAsync() {
-      return send(Type.STREAM_CLOSE);
+      return closeSupplier.get();
+    }
+
+    boolean isClosed() {
+      return closeSupplier.isInitialized();
     }
 
     @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index e914f7f..3dfe6d3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -23,12 +23,13 @@ public abstract class RaftClientMessage implements RaftRpcMessage {
   private final ClientId clientId;
   private final RaftPeerId serverId;
   private final RaftGroupId groupId;
+  private final long callId;
 
-  public RaftClientMessage(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId) {
+  RaftClientMessage(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
     this.clientId = clientId;
     this.serverId = serverId;
     this.groupId = groupId;
+    this.callId = callId;
   }
 
   @Override
@@ -54,9 +55,13 @@ public abstract class RaftClientMessage implements RaftRpcMessage {
     return groupId;
   }
 
+  public long getCallId() {
+    return callId;
+  }
+
   @Override
   public String toString() {
     return JavaUtils.getClassSimpleName(getClass()) + ":" + clientId + "->" + serverId
-        + (groupId != null? "@" + groupId: "");
+        + (groupId != null? "@" + groupId: "") + ", cid=" + getCallId();
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index a9bcd83..7973e0c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -38,7 +38,6 @@ import java.util.Collections;
  */
 public class RaftClientReply extends RaftClientMessage {
   private final boolean success;
-  private final long callId;
 
   /**
    * We mainly track two types of exceptions here:
@@ -71,9 +70,8 @@ public class RaftClientReply extends RaftClientMessage {
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, boolean success, Message message, RaftException exception,
       long logIndex, Collection<CommitInfoProto> commitInfos) {
-    super(clientId, serverId, groupId);
+    super(clientId, serverId, groupId, callId);
     this.success = success;
-    this.callId = callId;
     this.message = message;
     this.exception = exception;
     this.logIndex = logIndex;
@@ -125,17 +123,13 @@ public class RaftClientReply extends RaftClientMessage {
     return false;
   }
 
-  public long getCallId() {
-    return callId;
-  }
-
   public long getLogIndex() {
     return logIndex;
   }
 
   @Override
   public String toString() {
-    return super.toString() + ", cid=" + getCallId() + ", "
+    return super.toString() + ", "
         + (isSuccess()? "SUCCESS":  "FAILED " + exception)
         + ", logIndex=" + getLogIndex() + ", commits" + ProtoUtils.toString(commitInfos);
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 9bc623f..331ce20 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -191,7 +191,6 @@ public class RaftClientRequest extends RaftClientMessage {
         r.getCallId(), message, RaftClientRequest.writeRequestType(), r.getSlidingWindowEntry());
   }
 
-  private final long callId;
   private final Message message;
   private final Type type;
 
@@ -204,8 +203,7 @@ public class RaftClientRequest extends RaftClientMessage {
   public RaftClientRequest(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry) {
-    super(clientId, serverId, groupId);
-    this.callId = callId;
+    super(clientId, serverId, groupId, callId);
     this.message = message;
     this.type = type;
     this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
@@ -216,10 +214,6 @@ public class RaftClientRequest extends RaftClientMessage {
     return true;
   }
 
-  public long getCallId() {
-    return callId;
-  }
-
   public SlidingWindowEntry getSlidingWindowEntry() {
     return slidingWindowEntry;
   }
@@ -238,7 +232,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
+    return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
         + type + ", " + getMessage();
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0cabd07..837e368 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -24,13 +24,16 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.GroupInfoReply;
 import org.apache.ratis.protocol.GroupInfoRequest;
 import org.apache.ratis.protocol.GroupListReply;
 import org.apache.ratis.protocol.GroupListRequest;
 import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientMessage;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroup;
@@ -39,17 +42,19 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
-import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -74,6 +79,12 @@ abstract class DataStreamBaseTest extends BaseTest {
     return (byte) ('A' + pos%MODULUS);
   }
 
+  static final ByteString MOCK = ByteString.copyFromUtf8("mock");
+
+  static ByteString bytesWritten2ByteString(long bytesWritten) {
+    return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
+  }
+
   private final Executor executor = Executors.newFixedThreadPool(16);
 
   static class MultiDataStreamStateMachine extends BaseStateMachine {
@@ -81,9 +92,9 @@ abstract class DataStreamBaseTest extends BaseTest {
 
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
-      final SingleDataStream s = new SingleDataStream();
+      final SingleDataStream s = new SingleDataStream(request);
       streams.put(request.getCallId(), s);
-      return s.stream(request);
+      return CompletableFuture.completedFuture(s);
     }
 
     SingleDataStream getSingleDataStream(long callId) {
@@ -91,9 +102,9 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
-  static class SingleDataStream {
+  static class SingleDataStream implements DataStream {
     private int byteWritten = 0;
-    private RaftClientRequest writeRequest;
+    private final RaftClientRequest writeRequest;
 
     final WritableByteChannel channel = new WritableByteChannel() {
       private volatile boolean open = true;
@@ -122,7 +133,6 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
     };
 
-    final DataStream stream = new DataStream() {
       @Override
       public WritableByteChannel getWritableByteChannel() {
         return channel;
@@ -137,11 +147,9 @@ abstract class DataStreamBaseTest extends BaseTest {
         }
         return CompletableFuture.completedFuture(null);
       }
-    };
 
-    public CompletableFuture<DataStream> stream(RaftClientRequest request) {
-      writeRequest = request;
-      return CompletableFuture.completedFuture(stream);
+    SingleDataStream(RaftClientRequest request) {
+      this.writeRequest = request;
     }
 
     public int getByteWritten() {
@@ -204,7 +212,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
-      public StateMachine getStateMachine(RaftGroupId groupId) {
+      public MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) {
         return stateMachines.computeIfAbsent(groupId, key -> new MultiDataStreamStateMachine());
       }
 
@@ -241,7 +249,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       @Override
       public RaftClientReply submitClientRequest(RaftClientRequest request) {
-        return null;
+        return submitClientRequestAsync(request).join();
       }
 
       @Override
@@ -251,7 +259,11 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       @Override
       public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
-        return null;
+        final MultiDataStreamStateMachine stateMachine = getStateMachine(request.getRaftGroupId());
+        final SingleDataStream stream = stateMachine.getSingleDataStream(request.getCallId());
+        Assert.assertFalse(stream.getWritableByteChannel().isOpen());
+        return CompletableFuture.completedFuture(new RaftClientReply(request,
+            () -> bytesWritten2ByteString(stream.getByteWritten()), null));
       }
 
       @Override
@@ -328,12 +340,12 @@ abstract class DataStreamBaseTest extends BaseTest {
 
     List<RaftServer> raftServers = new ArrayList<>();
     peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
-    setup(peers, raftServers);
+    setup(RaftGroupId.randomId(), peers, raftServers);
   }
 
 
-  void setup(List<RaftPeer> peers, List<RaftServer> raftServers) {
-    raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
+  void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
+    raftGroup = RaftGroup.valueOf(groupId, peers);
     servers = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
@@ -358,6 +370,15 @@ abstract class DataStreamBaseTest extends BaseTest {
         .build();
   }
 
+  RaftClient newRaftClientForDataStream(ClientId clientId) {
+    return RaftClient.newBuilder()
+        .setClientId(clientId)
+        .setRaftGroup(raftGroup)
+        .setPrimaryDataStreamServer(getPrimaryServer().getPeer())
+        .setProperties(properties)
+        .build();
+  }
+
   protected void shutdown() throws IOException {
     for (Server server : servers) {
       server.close();
@@ -400,9 +421,10 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
 
-  void runTestMockCluster(int bufferSize, int bufferNum, Exception expectedException, Exception headerException)
+  void runTestMockCluster(ClientId clientId, int bufferSize, int bufferNum,
+      Exception expectedException, Exception headerException)
       throws IOException {
-    try (final RaftClient client = newRaftClientForDataStream()) {
+    try (final RaftClient client = newRaftClientForDataStream(clientId)) {
       final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
       if (headerException != null) {
         final DataStreamReply headerReply = out.getHeaderFuture().join();
@@ -413,22 +435,18 @@ abstract class DataStreamBaseTest extends BaseTest {
         return;
       }
 
-      runTestDataStream(out, bufferSize, bufferNum);
-      DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer) out.closeAsync().join();
-
-      final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
-          RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
+      final RaftClientReply clientReply = runTestDataStream(out, bufferSize, bufferNum).join();
       if (expectedException != null) {
-        Assert.assertFalse(replyByteBuffer.isSuccess());
+        Assert.assertFalse(clientReply.isSuccess());
         Assert.assertTrue(clientReply.getException().getMessage().contains(
             expectedException.getMessage()));
       } else {
-        Assert.assertTrue(replyByteBuffer.isSuccess());
+        Assert.assertTrue(clientReply.isSuccess());
       }
     }
   }
 
-  private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+  CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
     LOG.info("start stream {}", out.getHeader().getCallId());
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
     final List<Integer> sizes = new ArrayList<>();
@@ -466,6 +484,34 @@ abstract class DataStreamBaseTest extends BaseTest {
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
+
+    final long byteWritten = dataSize;
+    return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, byteWritten));
+  }
+
+  CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
+      long byteWritten) {
+    // Test close idempotent
+    Assert.assertSame(dataStreamReply, out.closeAsync().join());
+    testFailureCase("writeAsync should fail",
+        () -> out.writeAsync(DataStreamRequestByteBuffer.EMPTY_BYTE_BUFFER).join(),
+        CompletionException.class, (Logger)null, AlreadyClosedException.class);
+
+    try {
+      final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(RaftClientReplyProto.parseFrom(
+          ((DataStreamReplyByteBuffer) dataStreamReply).slice()));
+      assertRaftClientMessage(out.getHeader(), reply);
+      if (reply.isSuccess()) {
+        final ByteString bytes = reply.getMessage().getContent();
+        if (!bytes.equals(MOCK)) {
+          Assert.assertEquals(bytesWritten2ByteString(byteWritten), bytes);
+        }
+      }
+
+      return CompletableFuture.completedFuture(reply);
+    } catch (Throwable t) {
+      return JavaUtils.completeExceptionally(t);
+    }
   }
 
   void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
@@ -475,10 +521,10 @@ abstract class DataStreamBaseTest extends BaseTest {
     Assert.assertEquals(dataSize, stream.getByteWritten());
 
     final RaftClientRequest writeRequest = stream.getWriteRequest();
-    assertRaftClientRequest(header, writeRequest);
+    assertRaftClientMessage(header, writeRequest);
   }
 
-  static void assertRaftClientRequest(RaftClientRequest expected, RaftClientRequest computed) {
+  static void assertRaftClientMessage(RaftClientRequest expected, RaftClientMessage computed) {
     Assert.assertNotNull(computed);
     Assert.assertEquals(expected.getClientId(), computed.getClientId());
     Assert.assertEquals(expected.getServerId(), computed.getServerId());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index fdf68ad..bd9e6f0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -36,6 +36,7 @@ import org.apache.ratis.util.NetUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -92,32 +93,33 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
     ClientId clientId = ClientId.randomId();
     RaftGroupId groupId = RaftGroupId.randomId();
     final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" + leaderIndex).build();
-    RaftClientReply expectedClientReply = new RaftClientReply(clientId, suggestedLeader.getId(), groupId, 0,
-        leaderException == null, null, leaderException, 0, null);
 
     for (int i = 0; i < numServers; i ++) {
       RaftServer raftServer = mock(RaftServer.class);
-      RaftClientReply raftClientReply;
       RaftPeerId peerId = RaftPeerId.valueOf("s" + i);
       RaftProperties properties = new RaftProperties();
       NettyConfigKeys.DataStream.setPort(properties, NetUtils.createLocalServerAddress().getPort());
       RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
 
-      if (i == leaderIndex) {
-        raftClientReply = expectedClientReply;
-      } else {
-        RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId.valueOf(peerId, groupId);
-        NotLeaderException notLeaderException = new NotLeaderException(raftGroupMemberId, suggestedLeader, null);
-        raftClientReply = new RaftClientReply(clientId, peerId,
-            groupId, 0, false, null, notLeaderException, 0, null);
-      }
-
       if (submitException != null) {
         when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
             .thenThrow(submitException);
       } else {
+        final boolean isLeader = i == leaderIndex;
         when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
-            .thenReturn(CompletableFuture.completedFuture(raftClientReply));
+            .thenAnswer((Answer<CompletableFuture<RaftClientReply>>) invocation -> {
+              final RaftClientRequest r = (RaftClientRequest) invocation.getArguments()[0];
+              final RaftClientReply reply;
+              if (isLeader) {
+                reply = leaderException != null? new RaftClientReply(r, leaderException, null)
+                    : new RaftClientReply(r, () -> MOCK, null);
+              } else {
+                final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
+                final NotLeaderException notLeaderException = new NotLeaderException(memberId, suggestedLeader, null);
+                reply = new RaftClientReply(r, notLeaderException, null);
+              }
+              return CompletableFuture.completedFuture(reply);
+            });
       }
 
       when(raftServer.getProperties()).thenReturn(properties);
@@ -131,18 +133,18 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
       raftServers.add(raftServer);
     }
 
-    runTestMockCluster(raftServers, 1_000_000, 10,
+    runTestMockCluster(groupId, raftServers, clientId, 1_000_000, 10,
         submitException != null ? submitException : leaderException, getStateMachineException);
   }
 
-  void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int bufferNum,
+  void runTestMockCluster(RaftGroupId groupId, List<RaftServer> raftServers, ClientId clientId, int bufferSize, int bufferNum,
       Exception expectedException, Exception headerException) throws Exception {
     try {
       final List<RaftPeer> peers = raftServers.stream()
           .map(TestDataStreamNetty::newRaftPeer)
           .collect(Collectors.toList());
-      setup(peers, raftServers);
-      runTestMockCluster(bufferSize, bufferNum, expectedException, headerException);
+      setup(groupId, peers, raftServers);
+      runTestMockCluster(clientId, bufferSize, bufferNum, expectedException, headerException);
     } finally {
       shutdown();
     }