You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/25 14:41:03 UTC
[doris] 27/31: [Bug](point query) cancel future when meet timeout in PointQueryExec (#21573)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 356ec731a243c509478c7a87ea7aa2aaf8a1c78e
Author: lihangyu <15...@163.com>
AuthorDate: Tue Jul 25 18:18:09 2023 +0800
[Bug](point query) cancel future when meet timeout in PointQueryExec (#21573)
1. cancel future when meet timeout and add config to modify rpc timeout
2. add config to modify numof BackendServiceProxy since under high concurrent work load GRPC channel will be blocked
---
.../src/main/java/org/apache/doris/common/Config.java | 8 ++++++++
.../main/java/org/apache/doris/qe/PointQueryExec.java | 17 +++++++++--------
.../java/org/apache/doris/rpc/BackendServiceProxy.java | 2 +-
3 files changed, 18 insertions(+), 9 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index aa44a9aff3..970fe7eb5a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -399,6 +399,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"MySQL 服务的最大任务线程数", "The max number of task threads in MySQL service"})
public static int max_mysql_service_task_threads_num = 4096;
+ @ConfField(description = {"BackendServiceProxy数量, 用于池化GRPC channel",
+ "BackendServiceProxy pool size for pooling GRPC channels."})
+ public static int backend_proxy_num = 48;
+
@ConfField(description = {
"集群 ID,用于内部认证。通常在集群第一次启动时,会随机生成一个 cluster id. 用户也可以手动指定。",
"Cluster id used for internal authentication. Usually a random integer generated when master FE "
@@ -459,6 +463,10 @@ public class Config extends ConfigBase {
"The timeout of RPC between FE and Broker, in milliseconds"})
public static int broker_timeout_ms = 10000; // 10s
+ @ConfField(description = {"主键高并发点查短路径超时时间。",
+ "The timeout of RPC for high concurrenty short circuit query"})
+ public static int point_query_timeout_ms = 10000; // 10s
+
@ConfField(mutable = true, masterOnly = true, description = {"Insert load 的默认超时时间,单位是秒。",
"Default timeout for insert load job, in seconds."})
public static int insert_load_default_timeout_second = 14400; // 4 hour
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index e62e849be0..bda52b94ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -63,7 +63,7 @@ public class PointQueryExec {
private ArrayList<Expr> outputExprs;
private DescriptorTable descriptorTable;
private long tabletID = 0;
- private long timeoutMs = 1000; // default 1s
+ private long timeoutMs = Config.point_query_timeout_ms; // default 10s
private boolean isCancel = false;
private boolean isBinaryProtocol = false;
@@ -185,7 +185,7 @@ public class PointQueryExec {
while (pResult == null) {
InternalService.PTabletKeyLookupRequest request = requestBuilder.build();
Future<InternalService.PTabletKeyLookupResponse> futureResponse =
- BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request);
+ BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request);
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
LOG.warn("fetch result timeout {}", backend.getBrpcAdress());
@@ -201,15 +201,20 @@ public class PointQueryExec {
status.setStatus(Status.CANCELLED);
return null;
}
+ } catch (TimeoutException e) {
+ futureResponse.cancel(true);
+ LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAdress());
+ status.setStatus("query timeout");
+ return null;
}
}
} catch (RpcException e) {
- LOG.warn("fetch result rpc exception {}", backend.getBrpcAdress());
+ LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAdress(), e);
status.setRpcStatus(e.getMessage());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
return null;
} catch (ExecutionException e) {
- LOG.warn("fetch result execution exception {}", backend.getBrpcAdress());
+ LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAdress());
if (e.getMessage().contains("time out")) {
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage()));
@@ -218,10 +223,6 @@ public class PointQueryExec {
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
}
return null;
- } catch (TimeoutException e) {
- LOG.warn("fetch result timeout {}", backend.getBrpcAdress());
- status.setStatus("query timeout");
- return null;
}
TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index c7c6a144c6..39dfd7915f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -61,7 +61,7 @@ public class BackendServiceProxy {
}
private static class Holder {
- private static final int PROXY_NUM = 20;
+ private static final int PROXY_NUM = Config.backend_proxy_num;
private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM];
private static AtomicInteger count = new AtomicInteger();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org