You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/03 02:48:06 UTC
[incubator-uniffle] branch master updated: [ISSUE-106][IMPROVEMENT] Set rpc timeout for all rpc interface (#113)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 503f796 [ISSUE-106][IMPROVEMENT] Set rpc timeout for all rpc interface (#113)
503f796 is described below
commit 503f796c494709e18eef19f2f28c5791b2590df2
Author: xianjingfeng <58...@qq.com>
AuthorDate: Wed Aug 3 10:48:01 2022 +0800
[ISSUE-106][IMPROVEMENT] Set rpc timeout for all rpc interface (#113)
### What changes were proposed in this pull request?
Solve issue #106 Set rpc timeout for all rpc interface
### Why are the changes needed?
If oom encountered in shuffle server, rpc client will blocked until shuffle server was killed
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need
---
.../java/org/apache/uniffle/test/QuorumTest.java | 13 +++++++--
.../client/impl/grpc/ShuffleServerGrpcClient.java | 33 ++++++++++------------
2 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 3dc71f4..731c39e 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -152,11 +152,11 @@ public class QuorumTest extends ShuffleReadWriteBase {
// spark.rss.data.replica.write=2
// spark.rss.data.replica.read=2
((ShuffleServerGrpcClient)ShuffleServerClientFactory
- .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(100);
+ .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(200);
((ShuffleServerGrpcClient)ShuffleServerClientFactory
- .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(100);
+ .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(200);
((ShuffleServerGrpcClient)ShuffleServerClientFactory
- .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(100);
+ .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(200);
}
@AfterEach
@@ -166,6 +166,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
}
cleanCluster();
initCluster();
+ // we need recovery `rpcTime`, or some unit tests may fail
+ ((ShuffleServerGrpcClient)ShuffleServerClientFactory
+ .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(60000);
+ ((ShuffleServerGrpcClient)ShuffleServerClientFactory
+ .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(60000);
+ ((ShuffleServerGrpcClient)ShuffleServerClientFactory
+ .getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(60000);
}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index ce3d7cd..b9ac7b8 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -108,6 +108,10 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
super(host, port, maxRetryAttempts, usePlaintext);
blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
}
+
+ private ShuffleServerBlockingStub getBlockingStub() {
+ return blockingStub.withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
+ }
@Override
public String getDesc() {
@@ -135,7 +139,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
}
}
reqBuilder.setRemoteStorage(rsBuilder.build());
- return blockingStub.registerShuffle(reqBuilder.build());
+ return getBlockingStub().registerShuffle(reqBuilder.build());
}
private ShuffleCommitResponse doSendCommit(String appId, int shuffleId) {
@@ -144,8 +148,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
int retryNum = 0;
while (retryNum <= maxRetryAttempts) {
try {
- ShuffleCommitResponse response = blockingStub.withDeadlineAfter(
- RPC_TIMEOUT_DEFAULT_MS, TimeUnit.MILLISECONDS).commitShuffleTask(request);
+ ShuffleCommitResponse response = getBlockingStub().commitShuffleTask(request);
return response;
} catch (Exception e) {
retryNum++;
@@ -164,8 +167,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
public long requirePreAllocation(int requireSize, int retryMax, long retryIntervalMax) {
RequireBufferRequest rpcRequest = RequireBufferRequest.newBuilder().setRequireSize(requireSize).build();
long start = System.currentTimeMillis();
- RequireBufferResponse rpcResponse = blockingStub.withDeadlineAfter(
- RPC_TIMEOUT_DEFAULT_MS, TimeUnit.MILLISECONDS).requireBuffer(rpcRequest);
+ RequireBufferResponse rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
int retry = 0;
long result = FAILED_REQUIRE_ID;
Random random = new Random();
@@ -186,8 +188,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
} catch (Exception e) {
LOG.warn("Exception happened when require pre allocation from " + host + ":" + port, e);
}
- rpcResponse = blockingStub.withDeadlineAfter(
- RPC_TIMEOUT_DEFAULT_MS, TimeUnit.MILLISECONDS).requireBuffer(rpcRequest);
+ rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
retry++;
}
if (rpcResponse.getStatus() == StatusCode.SUCCESS) {
@@ -293,8 +294,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
int retryNum = 0;
while (retryNum < maxRetryAttempts) {
try {
- SendShuffleDataResponse response = blockingStub.withDeadlineAfter(
- rpcTimeout, TimeUnit.MILLISECONDS).sendShuffleData(rpcRequest);
+ SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
return response;
} catch (Exception e) {
retryNum++;
@@ -342,7 +342,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
FinishShuffleRequest rpcRequest = FinishShuffleRequest.newBuilder()
.setAppId(request.getAppId()).setShuffleId(request.getShuffleId()).build();
long start = System.currentTimeMillis();
- FinishShuffleResponse rpcResponse = blockingStub.finishShuffle(rpcRequest);
+ FinishShuffleResponse rpcResponse = getBlockingStub().finishShuffle(rpcRequest);
RssFinishShuffleResponse response;
if (rpcResponse.getStatus() != StatusCode.SUCCESS) {
@@ -404,8 +404,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
int retryNum = 0;
while (retryNum < maxRetryAttempts) {
try {
- ReportShuffleResultResponse response = blockingStub.withDeadlineAfter(
- rpcTimeout, TimeUnit.MILLISECONDS).reportShuffleResult(rpcRequest);
+ ReportShuffleResultResponse response = getBlockingStub().reportShuffleResult(rpcRequest);
return response;
} catch (Exception e) {
retryNum++;
@@ -424,9 +423,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.setShuffleId(request.getShuffleId())
.setPartitionId(request.getPartitionId())
.build();
- GetShuffleResultResponse rpcResponse = blockingStub
- .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS)
- .getShuffleResult(rpcRequest);
+ GetShuffleResultResponse rpcResponse = getBlockingStub().getShuffleResult(rpcRequest);
StatusCode statusCode = rpcResponse.getStatus();
RssGetShuffleResultResponse response;
@@ -463,7 +460,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.setLength(request.getLength())
.build();
long start = System.currentTimeMillis();
- GetLocalShuffleDataResponse rpcResponse = blockingStub.getLocalShuffleData(rpcRequest);
+ GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
LOG.info("GetShuffleData from {}:{} for {} cost {} ms", host, port, requestInfo,
@@ -498,7 +495,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.setPartitionNum(request.getPartitionNum())
.build();
long start = System.currentTimeMillis();
- GetLocalShuffleIndexResponse rpcResponse = blockingStub.getLocalShuffleIndex(rpcRequest);
+ GetLocalShuffleIndexResponse rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
LOG.info("GetShuffleIndex from {}:{} for {} cost {} ms", host, port,
@@ -535,7 +532,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.build();
long start = System.currentTimeMillis();
- GetMemoryShuffleDataResponse rpcResponse = blockingStub.getMemoryShuffleData(rpcRequest);
+ GetMemoryShuffleDataResponse rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
LOG.info("GetInMemoryShuffleData from {}:{} for " + requestInfo + " cost "