You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/03 02:48:06 UTC

[incubator-uniffle] branch master updated: [ISSUE-106][IMPROVEMENT] Set rpc timeout for all rpc interface (#113)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 503f796  [ISSUE-106][IMPROVEMENT] Set rpc timeout for all rpc interface (#113)
503f796 is described below

commit 503f796c494709e18eef19f2f28c5791b2590df2
Author: xianjingfeng <58...@qq.com>
AuthorDate: Wed Aug 3 10:48:01 2022 +0800

    [ISSUE-106][IMPROVEMENT] Set rpc timeout for all rpc interface (#113)
    
    ### What changes were proposed in this pull request?
    Solve issue #106 Set rpc timeout for all rpc interface
    
    ### Why are the changes needed?
    If oom encountered in shuffle server, rpc client will blocked until shuffle server was killed
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need
---
 .../java/org/apache/uniffle/test/QuorumTest.java   | 13 +++++++--
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 33 ++++++++++------------
 2 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 3dc71f4..731c39e 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -152,11 +152,11 @@ public class QuorumTest extends ShuffleReadWriteBase {
     // spark.rss.data.replica.write=2
     // spark.rss.data.replica.read=2
     ((ShuffleServerGrpcClient)ShuffleServerClientFactory
-      .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(100);
+      .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(200);
     ((ShuffleServerGrpcClient)ShuffleServerClientFactory
-      .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(100);
+      .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(200);
     ((ShuffleServerGrpcClient)ShuffleServerClientFactory
-      .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(100);
+      .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(200);
   }
 
   @AfterEach
@@ -166,6 +166,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
     }
     cleanCluster();
     initCluster();
+    // we need recovery `rpcTime`, or some unit tests may fail
+    ((ShuffleServerGrpcClient)ShuffleServerClientFactory
+            .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(60000);
+    ((ShuffleServerGrpcClient)ShuffleServerClientFactory
+            .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(60000);
+    ((ShuffleServerGrpcClient)ShuffleServerClientFactory
+            .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(60000);
   }
 
 
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index ce3d7cd..b9ac7b8 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -108,6 +108,10 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
     super(host, port, maxRetryAttempts, usePlaintext);
     blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
   }
+  
+  private ShuffleServerBlockingStub getBlockingStub() {
+    return blockingStub.withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
+  }
 
   @Override
   public String getDesc() {
@@ -135,7 +139,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
       }
     }
     reqBuilder.setRemoteStorage(rsBuilder.build());
-    return blockingStub.registerShuffle(reqBuilder.build());
+    return getBlockingStub().registerShuffle(reqBuilder.build());
   }
 
   private ShuffleCommitResponse doSendCommit(String appId, int shuffleId) {
@@ -144,8 +148,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
     int retryNum = 0;
     while (retryNum <= maxRetryAttempts) {
       try {
-        ShuffleCommitResponse response = blockingStub.withDeadlineAfter(
-            RPC_TIMEOUT_DEFAULT_MS, TimeUnit.MILLISECONDS).commitShuffleTask(request);
+        ShuffleCommitResponse response = getBlockingStub().commitShuffleTask(request);
         return response;
       } catch (Exception e) {
         retryNum++;
@@ -164,8 +167,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
   public long requirePreAllocation(int requireSize, int retryMax, long retryIntervalMax) {
     RequireBufferRequest rpcRequest = RequireBufferRequest.newBuilder().setRequireSize(requireSize).build();
     long start = System.currentTimeMillis();
-    RequireBufferResponse rpcResponse = blockingStub.withDeadlineAfter(
-        RPC_TIMEOUT_DEFAULT_MS, TimeUnit.MILLISECONDS).requireBuffer(rpcRequest);
+    RequireBufferResponse rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
     int retry = 0;
     long result = FAILED_REQUIRE_ID;
     Random random = new Random();
@@ -186,8 +188,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
       } catch (Exception e) {
         LOG.warn("Exception happened when require pre allocation from " + host + ":" + port, e);
       }
-      rpcResponse = blockingStub.withDeadlineAfter(
-          RPC_TIMEOUT_DEFAULT_MS, TimeUnit.MILLISECONDS).requireBuffer(rpcRequest);
+      rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
       retry++;
     }
     if (rpcResponse.getStatus() == StatusCode.SUCCESS) {
@@ -293,8 +294,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
     int retryNum = 0;
     while (retryNum < maxRetryAttempts) {
       try {
-        SendShuffleDataResponse response = blockingStub.withDeadlineAfter(
-            rpcTimeout, TimeUnit.MILLISECONDS).sendShuffleData(rpcRequest);
+        SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
         return response;
       } catch (Exception e) {
         retryNum++;
@@ -342,7 +342,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
     FinishShuffleRequest rpcRequest = FinishShuffleRequest.newBuilder()
         .setAppId(request.getAppId()).setShuffleId(request.getShuffleId()).build();
     long start = System.currentTimeMillis();
-    FinishShuffleResponse rpcResponse = blockingStub.finishShuffle(rpcRequest);
+    FinishShuffleResponse rpcResponse = getBlockingStub().finishShuffle(rpcRequest);
 
     RssFinishShuffleResponse response;
     if (rpcResponse.getStatus() != StatusCode.SUCCESS) {
@@ -404,8 +404,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
     int retryNum = 0;
     while (retryNum < maxRetryAttempts) {
       try {
-        ReportShuffleResultResponse response = blockingStub.withDeadlineAfter(
-            rpcTimeout, TimeUnit.MILLISECONDS).reportShuffleResult(rpcRequest);
+        ReportShuffleResultResponse response = getBlockingStub().reportShuffleResult(rpcRequest);
         return response;
       } catch (Exception e) {
         retryNum++;
@@ -424,9 +423,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
         .setShuffleId(request.getShuffleId())
         .setPartitionId(request.getPartitionId())
         .build();
-    GetShuffleResultResponse rpcResponse = blockingStub
-        .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS)
-        .getShuffleResult(rpcRequest);
+    GetShuffleResultResponse rpcResponse = getBlockingStub().getShuffleResult(rpcRequest);
     StatusCode statusCode = rpcResponse.getStatus();
 
     RssGetShuffleResultResponse response;
@@ -463,7 +460,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
         .setLength(request.getLength())
         .build();
     long start = System.currentTimeMillis();
-    GetLocalShuffleDataResponse rpcResponse = blockingStub.getLocalShuffleData(rpcRequest);
+    GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest);
     String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
         + request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
     LOG.info("GetShuffleData from {}:{} for {} cost {} ms", host, port, requestInfo,
@@ -498,7 +495,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
         .setPartitionNum(request.getPartitionNum())
         .build();
     long start = System.currentTimeMillis();
-    GetLocalShuffleIndexResponse rpcResponse = blockingStub.getLocalShuffleIndex(rpcRequest);
+    GetLocalShuffleIndexResponse rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest);
     String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
         + request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
     LOG.info("GetShuffleIndex from {}:{} for {} cost {} ms", host, port,
@@ -535,7 +532,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
         .build();
 
     long start = System.currentTimeMillis();
-    GetMemoryShuffleDataResponse rpcResponse = blockingStub.getMemoryShuffleData(rpcRequest);
+    GetMemoryShuffleDataResponse rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest);
     String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
         + request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
     LOG.info("GetInMemoryShuffleData from {}:{} for " + requestInfo + " cost "