You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/01/28 02:57:41 UTC
[incubator-doris] branch master updated: Fix some problems related
to thrift rpc when use nonblokcing IO model (#5117)
This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e774314 Fix some problems related to thrift rpc when use nonblokcing IO model (#5117)
e774314 is described below
commit e774314ffb4109c00135153c6ad9abe655d6c5e5
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Jan 28 10:57:30 2021 +0800
Fix some problems related to thrift rpc when use nonblokcing IO model (#5117)
* Fix some problems related to thrift rpc when use nonblokcing IO model
Co-authored-by: caiconghui [蔡聪辉] <ca...@xiaomi.com>
---
be/src/common/config.h | 4 ++--
be/src/runtime/client_cache.h | 6 ++---
be/src/util/thrift_client.h | 2 +-
docs/en/installing/install-deploy.md | 2 +-
docs/zh-CN/installing/install-deploy.md | 2 +-
.../java/org/apache/doris/common/ClientPool.java | 6 +++--
.../java/org/apache/doris/common/GenericPool.java | 28 ++++++++++++++++------
.../java/org/apache/doris/common/ThriftServer.java | 6 ++---
.../doris/common/ThriftServerEventProcessor.java | 13 +++++-----
.../java/org/apache/doris/system/HeartbeatMgr.java | 2 +-
10 files changed, 43 insertions(+), 28 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4d0132d..4d78596 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -558,8 +558,8 @@ CONF_Int32(query_cache_max_partition_count, "1024");
// This is to avoid too many version num.
CONF_mInt32(max_tablet_version_num, "500");
-// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED. if fe use THREADED model for thrift server,
-// the thrift_server_type_of_fe should be set THREADED to make be thrift client to fe constructed with TFramedTransport
+// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR. if fe use THREADED_SELECTOR model for thrift server,
+// the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be thrift client to fe constructed with TFramedTransport
CONF_String(thrift_server_type_of_fe, "THREAD_POOL");
// disable zone map index when page row is too few
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index f3bc2a3..b602a2c 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -265,10 +265,10 @@ private:
transform(thrift_server_type.begin(), thrift_server_type.end(), thrift_server_type.begin(),
toupper);
if (strcmp(typeid(T).name(), "N5doris21FrontendServiceClientE") == 0 &&
- thrift_server_type == "THREADED") {
- return ThriftServer::ServerType::THREADED;
+ thrift_server_type == "THREADED_SELECTOR") {
+ return ThriftServer::ServerType::NON_BLOCKING;
} else {
- return ThriftServer::ServerType::THREAD_POOL;
+ return ThriftServer::ServerType::THREADED;
}
}
};
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index 84122d3..e555914 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -118,9 +118,9 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port
: ThriftClientImpl(ipaddress, port), _iface(new InterfaceType(_protocol)) {
switch (server_type) {
case ThriftServer::NON_BLOCKING:
- case ThriftServer::THREADED:
_transport.reset(new apache::thrift::transport::TFramedTransport(_socket));
break;
+ case ThriftServer::THREADED:
case ThriftServer::THREAD_POOL:
_transport.reset(new apache::thrift::transport::TBufferedTransport(_socket));
break;
diff --git a/docs/en/installing/install-deploy.md b/docs/en/installing/install-deploy.md
index 17d2ea0..964c847 100644
--- a/docs/en/installing/install-deploy.md
+++ b/docs/en/installing/install-deploy.md
@@ -98,7 +98,7 @@ Doris instances communicate directly over the network. The following table shows
| BE | heartbeat\_service_port | 9050 | FE - > BE | the heart beat service port (thrift) on BE, used to receive heartbeat from FE|
| BE | brpc\_port* | 8060 | FE < - > BE, BE < - > BE | BE for communication between BEs|
| FE | http_port* | 8030 | FE < - > FE, HTTP server port on user | FE|
-| FE | rpc_port | 9020 | BE - > FE, FE < - > FE | thrift server port on FE|
+| FE | rpc_port | 9020 | BE - > FE, FE < - > FE | thrift server port on FE, the configuration of each fe needs to be consistent|
| FE | query_port | 9030 | user | FE|
| FE | edit\_log_port | 9010 | FE <--> FE | FE|
| Broker | broker ipc_port | 8000 | FE - > Broker, BE - > Broker | Broker for receiving requests|
diff --git a/docs/zh-CN/installing/install-deploy.md b/docs/zh-CN/installing/install-deploy.md
index 7c0b3af..7dc6379 100644
--- a/docs/zh-CN/installing/install-deploy.md
+++ b/docs/zh-CN/installing/install-deploy.md
@@ -95,7 +95,7 @@ Doris 各个实例直接通过网络进行通讯。以下表格展示了所有
| BE | heartbeat\_service_port | 9050 | FE --> BE | BE 上心跳服务端口(thrift),用于接收来自 FE 的心跳 |
| BE | brpc\_port* | 8060 | FE<-->BE, BE <--> BE | BE 上的 brpc 端口,用于 BE 之间通讯 |
| FE | http_port * | 8030 | FE <--> FE,用户 |FE 上的 http server 端口 |
-| FE | rpc_port | 9020 | BE --> FE, FE <--> FE | FE 上的 thrift server 端口 |
+| FE | rpc_port | 9020 | BE --> FE, FE <--> FE | FE 上的 thrift server 端口,每个fe的配置需要保持一致|
| FE | query_port | 9030 | 用户 | FE 上的 mysql server 端口 |
| FE | edit\_log_port | 9010 | FE <--> FE | FE 上的 bdbje 之间通信用的端口 |
| Broker | broker\_ipc_port | 8000 | FE --> Broker, BE --> Broker | Broker 上的 thrift server,用于接收请求 |
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 62b3d12..4098e06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -65,9 +65,11 @@ public class ClientPool {
public static GenericPool<HeartbeatService.Client> backendHeartbeatPool =
new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
- new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs);
+ new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs,
+ Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
public static GenericPool<FrontendService.Client> frontendPool =
- new GenericPool("FrontendService", backendConfig, backendTimeoutMs);
+ new GenericPool("FrontendService", backendConfig, backendTimeoutMs,
+ Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
public static GenericPool<BackendService.Client> backendPool =
new GenericPool("BackendService", backendConfig, backendTimeoutMs);
public static GenericPool<TPaloBrokerService.Client> brokerPool =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java
index a38cc4f..04de990 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/GenericPool.java
@@ -28,6 +28,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -39,12 +40,18 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
private GenericKeyedObjectPool<TNetworkAddress, VALUE> pool;
private String className;
private int timeoutMs;
+ private boolean isNonBlockingIO;
- public GenericPool(String className, GenericKeyedObjectPoolConfig config, int timeoutMs) {
+ public GenericPool(String className, GenericKeyedObjectPoolConfig config, int timeoutMs, boolean isNonBlockingIO) {
this.className = "org.apache.doris.thrift." + className + "$Client";
ThriftClientFactory factory = new ThriftClientFactory();
- pool = new GenericKeyedObjectPool<TNetworkAddress, VALUE>(factory, config);
+ pool = new GenericKeyedObjectPool<>(factory, config);
this.timeoutMs = timeoutMs;
+ this.isNonBlockingIO = isNonBlockingIO;
+ }
+
+ public GenericPool(String className, GenericKeyedObjectPoolConfig config, int timeoutMs) {
+ this(className, config, timeoutMs, false);
}
public boolean reopen(VALUE object, int timeoutMs) {
@@ -53,8 +60,11 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
try {
object.getOutputProtocol().getTransport().open();
// transport.open() doesn't set timeout, Maybe the timeoutMs change.
- TSocket socket = (TSocket) object.getOutputProtocol().getTransport();
- socket.setTimeout(timeoutMs);
+ // here we cannot set timeoutMs for TFramedTransport, just skip it
+ if (!isNonBlockingIO) {
+ TSocket socket = (TSocket) object.getOutputProtocol().getTransport();
+ socket.setTimeout(timeoutMs);
+ }
} catch (TTransportException e) {
ok = false;
}
@@ -87,8 +97,11 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
public VALUE borrowObject(TNetworkAddress address, int timeoutMs) throws Exception {
VALUE value = pool.borrowObject(address);
- TSocket socket = (TSocket) (value.getOutputProtocol().getTransport());
- socket.setTimeout(timeoutMs);
+ // here we cannot set timeoutMs for TFramedTransport, just skip it
+ if (!isNonBlockingIO) {
+ TSocket socket = (TSocket) (value.getOutputProtocol().getTransport());
+ socket.setTimeout(timeoutMs);
+ }
return value;
}
@@ -124,7 +137,8 @@ public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
LOG.debug("before create socket hostname={} key.port={} timeoutMs={}",
key.hostname, key.port, timeoutMs);
}
- TTransport transport = new TSocket(key.hostname, key.port, timeoutMs);
+ TTransport transport = isNonBlockingIO ? new TFramedTransport(new TSocket(key.hostname, key.port, timeoutMs)) :
+ new TSocket(key.hostname, key.port, timeoutMs);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
VALUE client = (VALUE) newInstance(className, protocol);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
index c6892ca..3eaf981 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
@@ -48,14 +48,14 @@ public class ThriftServer {
private Set<TNetworkAddress> connects;
public static final String SIMPLE = "SIMPLE";
- public static final String THREADED = "THREADED";
+ public static final String THREADED_SELECTOR = "THREADED_SELECTOR";
public static final String THREAD_POOL = "THREAD_POOL";
public enum ThriftServerType {
// TSimplerServer
SIMPLE(ThriftServer.SIMPLE),
// TThreadedSelectorServer
- THREADED(ThriftServer.THREADED),
+ THREADED_SELECTOR(ThriftServer.THREADED_SELECTOR),
// TThreadPoolServer
THREAD_POOL(ThriftServer.THREAD_POOL);
@@ -125,7 +125,7 @@ public class ThriftServer {
case SIMPLE:
createSimpleServer();
break;
- case THREADED:
+ case THREADED_SELECTOR:
createThreadedServer();
break;
default:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java
index 74be897..f47f23f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServerEventProcessor.java
@@ -58,15 +58,14 @@ public class ThriftServerEventProcessor implements TServerEventHandler {
// param input is class org.apache.thrift.protocol.TBinaryProtocol
TSocket tSocket = null;
TTransport transport = input.getTransport();
-
switch (thriftServer.getType()) {
- case THREADED:
+ case THREADED_SELECTOR:
// class org.apache.thrift.transport.TFramedTransport
Preconditions.checkState(transport instanceof TFramedTransport);
- TFramedTransport framedTransport = (TFramedTransport) transport;
// NOTE: we need patch code in TNonblockingServer, we don't use for now.
- // see https://issues.apache.org/jira/browse/THRIFT-1053
- break;
+ // see https://issues.apache.org/jira/browse/THRI FT-1053
+ LOG.debug("TFramedTransport cannot create thrift context. server type: {}", thriftServer.getType());
+ return null;
case SIMPLE:
case THREAD_POOL:
// org.apache.thrift.transport.TSocket
@@ -93,7 +92,7 @@ public class ThriftServerEventProcessor implements TServerEventHandler {
thriftServer.addConnect(clientAddress);
- LOG.debug("create thrift context. client: {}", clientAddress);
+ LOG.debug("create thrift context. client: {}, server type: {}", clientAddress, thriftServer.getType());
return new ThriftServerContext(clientAddress);
}
@@ -108,7 +107,7 @@ public class ThriftServerEventProcessor implements TServerEventHandler {
TNetworkAddress clientAddress = thriftServerContext.getClient();
connectionContext.remove();
thriftServer.removeConnect(clientAddress);
- LOG.debug("delete thrift context. client: {}", clientAddress);
+ LOG.debug("delete thrift context. client: {}, server type: {}", clientAddress, thriftServer.getType());
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 3b197b8..22201e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -342,7 +342,7 @@ public class HeartbeatMgr extends MasterDaemon {
private HeartbeatResponse getHeartbeatResponseByThrift() {
FrontendService.Client client = null;
- TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
+ TNetworkAddress addr = new TNetworkAddress(fe.getHost(), Config.rpc_port);
boolean ok = false;
try {
client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org