You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/01/28 02:57:41 UTC

[incubator-doris] branch master updated: Fix some problems related to thrift rpc when use nonblokcing IO model (#5117)

This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e774314  Fix some problems related to thrift rpc when use nonblokcing IO model (#5117)
e774314 is described below

commit e774314ffb4109c00135153c6ad9abe655d6c5e5
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Jan 28 10:57:30 2021 +0800

    Fix some problems related to thrift rpc when use nonblokcing IO model (#5117)
    
    * Fix some problems related to thrift rpc when use nonblokcing IO model
    
    Co-authored-by: caiconghui [蔡聪辉] <ca...@xiaomi.com>
---
 be/src/common/config.h                             |  4 ++--
 be/src/runtime/client_cache.h                      |  6 ++---
 be/src/util/thrift_client.h                        |  2 +-
 docs/en/installing/install-deploy.md               |  2 +-
 docs/zh-CN/installing/install-deploy.md            |  2 +-
 .../java/org/apache/doris/common/ClientPool.java   |  6 +++--
 .../java/org/apache/doris/common/GenericPool.java  | 28 ++++++++++++++++------
 .../java/org/apache/doris/common/ThriftServer.java |  6 ++---
 .../doris/common/ThriftServerEventProcessor.java   | 13 +++++-----
 .../java/org/apache/doris/system/HeartbeatMgr.java |  2 +-
 10 files changed, 43 insertions(+), 28 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4d0132d..4d78596 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -558,8 +558,8 @@ CONF_Int32(query_cache_max_partition_count, "1024");
 // This is to avoid too many version num.
 CONF_mInt32(max_tablet_version_num, "500");
 
-// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED. if fe use THREADED model for thrift server,
-// the thrift_server_type_of_fe should be set THREADED to make be thrift client to fe constructed with TFramedTransport
+// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR. if fe use THREADED_SELECTOR model for thrift server,
+// the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be thrift client to fe constructed with TFramedTransport
 CONF_String(thrift_server_type_of_fe, "THREAD_POOL");
 
 // disable zone map index when page row is too few
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index f3bc2a3..b602a2c 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -265,10 +265,10 @@ private:
         transform(thrift_server_type.begin(), thrift_server_type.end(), thrift_server_type.begin(),
                   toupper);
         if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 &&
-            thrift_server_type == "THREADED") {
-            return ThriftServer::ServerType::THREADED;
+            thrift_server_type == "THREADED_SELECTOR") {
+            return ThriftServer::ServerType::NON_BLOCKING;
         } else {
-            return ThriftServer::ServerType::THREAD_POOL;
+            return ThriftServer::ServerType::THREADED;
         }
     }
 };
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index 84122d3..e555914 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -118,9 +118,9 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port
         : ThriftClientImpl(ipaddress, port), _iface(new InterfaceType(_protocol)) {
     switch (server_type) {
     case ThriftServer::NON_BLOCKING:
-    case ThriftServer::THREADED:
         _transport.reset(new apache::thrift::transport::TFramedTransport(_socket));
         break;
+    case ThriftServer::THREADED:
     case ThriftServer::THREAD_POOL:
         _transport.reset(new apache::thrift::transport::TBufferedTransport(_socket));
         break;
diff --git a/docs/en/installing/install-deploy.md b/docs/en/installing/install-deploy.md
index 17d2ea0..964c847 100644
--- a/docs/en/installing/install-deploy.md
+++ b/docs/en/installing/install-deploy.md
@@ -98,7 +98,7 @@ Doris instances communicate directly over the network. The following table shows
 | BE | heartbeat\_service_port | 9050 | FE - > BE | the heart beat service port (thrift) on BE, used to receive heartbeat from FE|
 | BE | brpc\_port* | 8060 | FE < - > BE, BE < - > BE | BE for communication between BEs|
 | FE | http_port* | 8030 | FE < - > FE, HTTP server port on user | FE|
-| FE | rpc_port | 9020 | BE - > FE, FE < - > FE | thrift server port on FE|
+| FE | rpc_port | 9020 | BE - > FE, FE < - > FE | thrift server port on FE, the configuration of each fe needs to be consistent|
 | FE | query_port | 9030 | user | FE|
 | FE | edit\_log_port | 9010 | FE <--> FE | FE|
 | Broker | broker ipc_port | 8000 | FE - > Broker, BE - > Broker | Broker for receiving requests|
diff --git a/docs/zh-CN/installing/install-deploy.md b/docs/zh-CN/installing/install-deploy.md
index 7c0b3af..7dc6379 100644
--- a/docs/zh-CN/installing/install-deploy.md
+++ b/docs/zh-CN/installing/install-deploy.md
@@ -95,7 +95,7 @@ Doris 各个实例直接通过网络进行通讯。以下表格展示了所有
 | BE | heartbeat\_service_port | 9050 | FE --> BE | BE 上心跳服务端口(thrift),用于接收来自 FE 的心跳 |
 | BE | brpc\_port* | 8060 | FE<-->BE, BE <--> BE | BE 上的 brpc 端口,用于 BE 之间通讯 |
 | FE | http_port * | 8030 | FE <--> FE,用户 |FE 上的 http server 端口 |
-| FE | rpc_port | 9020 | BE --> FE, FE <--> FE | FE 上的 thrift server 端口 |
+| FE | rpc_port | 9020 | BE --> FE, FE <--> FE | FE 上的 thrift server 端口,每个fe的配置需要保持一致|
 | FE | query_port | 9030 | 用户 | FE 上的 mysql server 端口 |
 | FE | edit\_log_port | 9010 | FE <--> FE | FE 上的 bdbje 之间通信用的端口 |
 | Broker | broker\_ipc_port | 8000 | FE --> Broker, BE --> Broker | Broker 上的 thrift server,用于接收请求 |
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 62b3d12..4098e06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -65,9 +65,11 @@ public class ClientPool {
     public static GenericPool<HeartbeatService.Client> backendHeartbeatPool =
             new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
     public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
-            new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs);
+            new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs,
+                    Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<FrontendService.Client> frontendPool =
-            new GenericPool("FrontendService", backendConfig, backendTimeoutMs);
+            new GenericPool("FrontendService", backendConfig, backendTimeoutMs,
+                    Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<BackendService.Client> backendPool =
             new GenericPool("BackendService", backendConfig, backendTimeoutMs);
     public static GenericPool<TPaloBrokerService.Client> brokerPool =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java
index a38cc4f..04de990 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java
@@ -28,6 +28,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -39,12 +40,18 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient>  {
     private GenericKeyedObjectPool<TNetworkAddress, VALUE> pool;
     private String className;
     private int timeoutMs;
+    private boolean isNonBlockingIO;
 
-    public GenericPool(String className, GenericKeyedObjectPoolConfig config, int timeoutMs) {
+    public GenericPool(String className, GenericKeyedObjectPoolConfig config, int timeoutMs, boolean isNonBlockingIO) {
         this.className = "org.apache.doris.thrift." + className + "$Client";
         ThriftClientFactory factory = new ThriftClientFactory();
-        pool = new GenericKeyedObjectPool<TNetworkAddress, VALUE>(factory, config);
+        pool = new GenericKeyedObjectPool<>(factory, config);
         this.timeoutMs = timeoutMs;
+        this.isNonBlockingIO = isNonBlockingIO;
+    }
+
+    public GenericPool(String className, GenericKeyedObjectPoolConfig config, int timeoutMs) {
+        this(className, config, timeoutMs, false);
     }
 
     public boolean reopen(VALUE object, int timeoutMs) {
@@ -53,8 +60,11 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient>  {
         try {
             object.getOutputProtocol().getTransport().open();
             // transport.open() doesn't set timeout, Maybe the timeoutMs change.
-            TSocket socket = (TSocket) object.getOutputProtocol().getTransport();
-            socket.setTimeout(timeoutMs);
+            // here we cannot set timeoutMs for TFramedTransport, just skip it
+            if (!isNonBlockingIO) {
+                TSocket socket = (TSocket) object.getOutputProtocol().getTransport();
+                socket.setTimeout(timeoutMs);
+            }
         } catch (TTransportException e) {
             ok = false;
         }
@@ -87,8 +97,11 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient>  {
 
     public VALUE borrowObject(TNetworkAddress address, int timeoutMs) throws Exception {
         VALUE value = pool.borrowObject(address);
-        TSocket socket = (TSocket) (value.getOutputProtocol().getTransport());
-        socket.setTimeout(timeoutMs);
+        // here we cannot set timeoutMs for TFramedTransport, just skip it
+        if (!isNonBlockingIO) {
+            TSocket socket = (TSocket) (value.getOutputProtocol().getTransport());
+            socket.setTimeout(timeoutMs);
+        }
         return value;
     }
 
@@ -124,7 +137,8 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient>  {
                 LOG.debug("before create socket hostname={} key.port={} timeoutMs={}",
                         key.hostname, key.port, timeoutMs);
             }
-            TTransport transport = new TSocket(key.hostname, key.port, timeoutMs);
+            TTransport transport = isNonBlockingIO ? new TFramedTransport(new TSocket(key.hostname, key.port, timeoutMs)) :
+                    new TSocket(key.hostname, key.port, timeoutMs);
             transport.open();
             TProtocol protocol = new TBinaryProtocol(transport);
             VALUE client = (VALUE) newInstance(className, protocol);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
index c6892ca..3eaf981 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
@@ -48,14 +48,14 @@ public class ThriftServer {
     private Set<TNetworkAddress> connects;
 
     public static final String SIMPLE = "SIMPLE";
-    public static final String THREADED = "THREADED";
+    public static final String THREADED_SELECTOR = "THREADED_SELECTOR";
     public static final String THREAD_POOL = "THREAD_POOL";
 
     public enum ThriftServerType {
         // TSimplerServer
         SIMPLE(ThriftServer.SIMPLE),
         // TThreadedSelectorServer
-        THREADED(ThriftServer.THREADED),
+        THREADED_SELECTOR(ThriftServer.THREADED_SELECTOR),
         // TThreadPoolServer
         THREAD_POOL(ThriftServer.THREAD_POOL);
 
@@ -125,7 +125,7 @@ public class ThriftServer {
                 case SIMPLE:
                     createSimpleServer();
                     break;
-                case THREADED:
+                case THREADED_SELECTOR:
                     createThreadedServer();
                     break;
                 default:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java
index 74be897..f47f23f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java
@@ -58,15 +58,14 @@ public class ThriftServerEventProcessor implements TServerEventHandler {
         // param input is class org.apache.thrift.protocol.TBinaryProtocol
         TSocket tSocket = null;
         TTransport transport = input.getTransport();
-
         switch (thriftServer.getType()) {
-            case THREADED:
+            case THREADED_SELECTOR:
                 // class org.apache.thrift.transport.TFramedTransport
                 Preconditions.checkState(transport instanceof TFramedTransport);
-                TFramedTransport framedTransport = (TFramedTransport) transport;
                 // NOTE: we need patch code in TNonblockingServer, we don't use for now.
-                //  see https://issues.apache.org/jira/browse/THRIFT-1053
-                break;
+                //  see https://issues.apache.org/jira/browse/THRI FT-1053
+                LOG.debug("TFramedTransport cannot create thrift context. server type: {}", thriftServer.getType());
+                return null;
             case SIMPLE:
             case THREAD_POOL:
                 // org.apache.thrift.transport.TSocket
@@ -93,7 +92,7 @@ public class ThriftServerEventProcessor implements TServerEventHandler {
 
         thriftServer.addConnect(clientAddress);
 
-        LOG.debug("create thrift context. client: {}", clientAddress);
+        LOG.debug("create thrift context. client: {}, server type: {}", clientAddress, thriftServer.getType());
         return new ThriftServerContext(clientAddress);
     }
 
@@ -108,7 +107,7 @@ public class ThriftServerEventProcessor implements TServerEventHandler {
         TNetworkAddress clientAddress = thriftServerContext.getClient();
         connectionContext.remove();
         thriftServer.removeConnect(clientAddress);
-        LOG.debug("delete thrift context. client: {}", clientAddress);
+        LOG.debug("delete thrift context. client: {}, server type: {}", clientAddress, thriftServer.getType());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 3b197b8..22201e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -342,7 +342,7 @@ public class HeartbeatMgr extends MasterDaemon {
 
         private HeartbeatResponse getHeartbeatResponseByThrift() {
             FrontendService.Client client = null;
-            TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
+            TNetworkAddress addr = new TNetworkAddress(fe.getHost(), Config.rpc_port);
             boolean ok = false;
             try {
                 client = ClientPool.frontendHeartbeatPool.borrowObject(addr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org