You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:45 UTC

[incubator-uniffle] 01/17: [Improvement] Avoid using the default forkjoin pool by parallelStream directly (#180)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 46b62b2406a547dca6f6b933ee187047e3618202
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Tue Jun 21 14:15:59 2022 +0800

    [Improvement] Avoid using the default forkjoin pool by parallelStream directly (#180)
    
    ### What changes were proposed in this pull request?
    As we know that parallelStream will use the default forkjoin pool in entire jvm. To avoid it, use the custom pool and allow to specify the pool size.
    
    ### Why are the changes needed?
    use separate forkjoin pool to send shuffle data
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, introduce the configuration to control the size of forkjoinpool.
    mapreduce.rss.client.data.transfer.pool.size for MapReduce
    spark.rss.client.data.transfer.pool.size for Spark
    
    ### How was this patch tested?
    GA passed.
---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   |  4 ++++
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    |  5 ++++-
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  4 ++++
 .../apache/spark/shuffle/RssShuffleManager.java    |  5 ++++-
 .../apache/spark/shuffle/RssShuffleManager.java    | 14 ++++++++++---
 .../rss/client/factory/ShuffleClientFactory.java   |  4 ++--
 .../rss/client/impl/ShuffleWriteClientImpl.java    | 24 ++++++++++++++--------
 .../tencent/rss/client/util/RssClientConfig.java   |  2 ++
 .../client/impl/ShuffleWriteClientImplTest.java    |  2 +-
 .../test/java/com/tencent/rss/test/QuorumTest.java |  2 +-
 .../tencent/rss/test/ShuffleServerGrpcTest.java    |  2 +-
 .../tencent/rss/test/ShuffleWithRssClientTest.java |  2 +-
 12 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index a191e2f..3447f09 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -52,6 +52,10 @@ public class RssMRConfig {
       RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
       MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE =
+          MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+  public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
   public static final String RSS_CLIENT_SEND_THREAD_NUM =
       MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
   public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 1d8b4d6..16613e1 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -90,10 +90,13 @@ public class RssMRUtils {
         RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
     boolean replicaSkipEnabled = jobConf.getBoolean(RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
         RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+    int dataTransferPoolSize = jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+        RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
     ShuffleWriteClient client = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
-            heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled);
+            heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled,
+                dataTransferPoolSize);
     return client;
   }
 
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 9720ff0..8d5dda9 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -106,6 +106,10 @@ public class RssSparkConfig {
   public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE = RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
       SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE =
+      SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+  public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
+      RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
   public static final String RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE =
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f1f2a36..5d11c39 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -82,6 +82,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaWrite;
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
+  private final int dataTransferPoolSize;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
   private RemoteStorageInfo remoteStorage;
@@ -144,6 +145,8 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
     this.dataReplicaRead =  sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
         RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+    this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
     this.dataReplicaSkipEnabled = sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
         RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     LOG.info("Check quorum config ["
@@ -167,7 +170,7 @@ public class RssShuffleManager implements ShuffleManager {
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled);
+          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 88a7bf8..1cfacd2 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -82,6 +82,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaWrite;
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
+  private final int dataTransferPoolSize;
   private ShuffleWriteClient shuffleWriteClient;
   private final Map<String, Set<Long>> taskToSuccessBlockIds;
   private final Map<String, Set<Long>> taskToFailedBlockIds;
@@ -155,7 +156,7 @@ public class RssShuffleManager implements ShuffleManager {
     this.heartbeatInterval = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_INTERVAL,
         RssSparkConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
     this.heartbeatTimeout = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2);
-    int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX,
+    final int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX,
         RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE,
         RssSparkConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
@@ -167,10 +168,14 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     int heartBeatThreadNum = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
         RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+
+    this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled);
+          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
@@ -233,10 +238,13 @@ public class RssShuffleManager implements ShuffleManager {
       RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     int heartBeatThreadNum = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
       RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+    this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+
      shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled);
+          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.taskToFailedBlockIds = taskToFailedBlockIds;
     if (loop != null) {
diff --git a/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java b/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java
index aefb7b6..bc4b0cf 100644
--- a/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java
@@ -37,9 +37,9 @@ public class ShuffleClientFactory {
 
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
-      int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled) {
+      int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize) {
     return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-      replica, replicaWrite, replicaRead, replicaSkipEnabled);
+      replica, replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize);
   }
 
   public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request) {
diff --git a/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java
index d8bfe99..34571f2 100644
--- a/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,19 +88,24 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   private int replicaWrite;
   private int replicaRead;
   private boolean replicaSkipEnabled;
+  private int dataTranferPoolSize;
+  private final ForkJoinPool dataTransferPool;
 
   public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
-                                int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled) {
+                                int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled,
+                                int dataTranferPoolSize) {
     this.clientType = clientType;
     this.retryMax = retryMax;
     this.retryIntervalMax = retryIntervalMax;
-    coordinatorClientFactory = new CoordinatorClientFactory(clientType);
-    heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build());
+    this.coordinatorClientFactory = new CoordinatorClientFactory(clientType);
+    this.heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build());
     this.replica = replica;
     this.replicaWrite = replicaWrite;
     this.replicaRead = replicaRead;
     this.replicaSkipEnabled = replicaSkipEnabled;
+    this.dataTranferPoolSize = dataTranferPoolSize;
+    this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
   }
 
   private boolean sendShuffleDataAsync(
@@ -110,13 +116,13 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     // If one or more servers is failed, the sending is not totally successful.
     AtomicBoolean isAllServersSuccess = new AtomicBoolean(true);
     if (serverToBlocks != null) {
-      serverToBlocks.entrySet().parallelStream().forEach(entry -> {
+      dataTransferPool.submit(() -> serverToBlocks.entrySet().parallelStream().forEach(entry -> {
         ShuffleServerInfo ssi = entry.getKey();
         try {
           Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = entry.getValue();
           // todo: compact unnecessary blocks that reach replicaWrite
           RssSendShuffleDataRequest request = new RssSendShuffleDataRequest(
-            appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
+                  appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
           long s = System.currentTimeMillis();
           RssSendShuffleDataResponse response = getShuffleServerClient(ssi).sendShuffleData(request);
           LOG.info("ShuffleWriteClientImpl sendShuffleData cost:" + (System.currentTimeMillis() - s));
@@ -125,17 +131,17 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
             // mark a replica of block that has been sent
             serverToBlockIds.get(ssi).forEach(block -> blockIdsTracker.get(block).incrementAndGet());
             LOG.info("Send: " + serverToBlockIds.get(ssi).size()
-              + " blocks to [" + ssi.getId() + "] successfully");
+                    + " blocks to [" + ssi.getId() + "] successfully");
           } else {
             isAllServersSuccess.set(false);
             LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to [" + ssi.getId()
-              + "] failed with statusCode[" + response.getStatusCode() + "], ");
+                    + "] failed with statusCode[" + response.getStatusCode() + "], ");
           }
         } catch (Exception e) {
           isAllServersSuccess.set(false);
           LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to [" + ssi.getId() + "] failed.", e);
         }
-      });
+      })).join();
     }
     return isAllServersSuccess.get();
   }
diff --git a/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java b/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java
index e1c6df7..c2ca8fb 100644
--- a/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java
+++ b/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java
@@ -35,6 +35,8 @@ public class RssClientConfig {
   public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE = 1;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED = "rss.data.replica.skip.enabled";
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = true;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE = "rss.client.data.transfer.pool.size";
+  public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE = Runtime.getRuntime().availableProcessors();
   public static final String RSS_HEARTBEAT_INTERVAL = "rss.heartbeat.interval";
   public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE = 10 * 1000L;
   public static final String RSS_HEARTBEAT_TIMEOUT = "rss.heartbeat.timeout";
diff --git a/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java
index 16e889c..e3b4443 100644
--- a/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java
@@ -42,7 +42,7 @@ public class ShuffleWriteClientImplTest {
   @Test
   public void testSendData() {
     ShuffleWriteClientImpl shuffleWriteClient =
-        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true);
+        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1);
     ShuffleServerClient mockShuffleServerClient = mock(ShuffleServerClient.class);
     ShuffleWriteClientImpl spyClient = spy(shuffleWriteClient);
     doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java
index 01614ae..0eeb2f0 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java
@@ -259,7 +259,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
       int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {
 
     shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      replica, replicaWrite, replicaRead, replicaSkip);
+      replica, replicaWrite, replicaRead, replicaSkip, 1);
 
     List<ShuffleServerInfo> allServers = Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
         shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java
index b4ea72a..3b30828 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java
@@ -102,7 +102,7 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
   public void clearResourceTest() throws Exception {
     final ShuffleWriteClient shuffleWriteClient =
         ShuffleClientFactory.getInstance().createShuffleWriteClient(
-            "GRPC", 2, 10000L, 4, 1, 1, 1, true);
+            "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1);
     shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
     shuffleWriteClient.registerShuffle(
         new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java
index 2a6f2d9..a23d3a4 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java
@@ -89,7 +89,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
   @BeforeEach
   public void createClient() {
     shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      1, 1, 1, true);
+      1, 1, 1, true, 1);
   }
 
   @AfterEach