You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/25 14:41:03 UTC

[doris] 27/31: [Bug](point query) cancel future when meet timeout in PointQueryExec (#21573)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 356ec731a243c509478c7a87ea7aa2aaf8a1c78e
Author: lihangyu <15...@163.com>
AuthorDate: Tue Jul 25 18:18:09 2023 +0800

    [Bug](point query) cancel future when meet timeout in PointQueryExec (#21573)
    
    1. cancel future when meet timeout and add config to modify rpc timeout
    2. add config to modify numof BackendServiceProxy since under high concurrent work load GRPC channel will be blocked
---
 .../src/main/java/org/apache/doris/common/Config.java   |  8 ++++++++
 .../main/java/org/apache/doris/qe/PointQueryExec.java   | 17 +++++++++--------
 .../java/org/apache/doris/rpc/BackendServiceProxy.java  |  2 +-
 3 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index aa44a9aff3..970fe7eb5a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -399,6 +399,10 @@ public class Config extends ConfigBase {
     @ConfField(description = {"MySQL 服务的最大任务线程数", "The max number of task threads in MySQL service"})
     public static int max_mysql_service_task_threads_num = 4096;
 
+    @ConfField(description = {"BackendServiceProxy数量, 用于池化GRPC channel",
+            "BackendServiceProxy pool size for pooling GRPC channels."})
+    public static int backend_proxy_num = 48;
+
     @ConfField(description = {
             "集群 ID,用于内部认证。通常在集群第一次启动时,会随机生成一个 cluster id. 用户也可以手动指定。",
             "Cluster id used for internal authentication. Usually a random integer generated when master FE "
@@ -459,6 +463,10 @@ public class Config extends ConfigBase {
             "The timeout of RPC between FE and Broker, in milliseconds"})
     public static int broker_timeout_ms = 10000; // 10s
 
+    @ConfField(description = {"主键高并发点查短路径超时时间。",
+            "The timeout of RPC for high concurrenty short circuit query"})
+    public static int point_query_timeout_ms = 10000; // 10s
+
     @ConfField(mutable = true, masterOnly = true, description = {"Insert load 的默认超时时间,单位是秒。",
             "Default timeout for insert load job, in seconds."})
     public static int insert_load_default_timeout_second = 14400; // 4 hour
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index e62e849be0..bda52b94ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -63,7 +63,7 @@ public class PointQueryExec {
     private ArrayList<Expr> outputExprs;
     private DescriptorTable descriptorTable;
     private long tabletID = 0;
-    private long timeoutMs = 1000; // default 1s
+    private long timeoutMs = Config.point_query_timeout_ms; // default 10s
 
     private boolean isCancel = false;
     private boolean isBinaryProtocol = false;
@@ -185,7 +185,7 @@ public class PointQueryExec {
             while (pResult == null) {
                 InternalService.PTabletKeyLookupRequest request = requestBuilder.build();
                 Future<InternalService.PTabletKeyLookupResponse> futureResponse =
-                        BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request);
+                         BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request);
                 long currentTs = System.currentTimeMillis();
                 if (currentTs >= timeoutTs) {
                     LOG.warn("fetch result timeout {}", backend.getBrpcAdress());
@@ -201,15 +201,20 @@ public class PointQueryExec {
                         status.setStatus(Status.CANCELLED);
                         return null;
                     }
+                } catch (TimeoutException e) {
+                    futureResponse.cancel(true);
+                    LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAdress());
+                    status.setStatus("query timeout");
+                    return null;
                 }
             }
         } catch (RpcException e) {
-            LOG.warn("fetch result rpc exception {}", backend.getBrpcAdress());
+            LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAdress(), e);
             status.setRpcStatus(e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
             return null;
         } catch (ExecutionException e) {
-            LOG.warn("fetch result execution exception {}", backend.getBrpcAdress());
+            LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAdress());
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not retry querying.
                 status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage()));
@@ -218,10 +223,6 @@ public class PointQueryExec {
                 SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
             }
             return null;
-        } catch (TimeoutException e) {
-            LOG.warn("fetch result timeout {}", backend.getBrpcAdress());
-            status.setStatus("query timeout");
-            return null;
         }
         TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
         if (code != TStatusCode.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index c7c6a144c6..39dfd7915f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -61,7 +61,7 @@ public class BackendServiceProxy {
     }
 
     private static class Holder {
-        private static final int PROXY_NUM = 20;
+        private static final int PROXY_NUM = Config.backend_proxy_num;
         private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM];
         private static AtomicInteger count = new AtomicInteger();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org