You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/02/01 18:50:52 UTC
[pinot] branch master updated: Timeout if waiting server channel lock takes a long time (#8083)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 86fe41e Timeout if waiting server channel lock takes a long time (#8083)
86fe41e is described below
commit 86fe41e2e9c17090f1fa9bcc39dbd61e3b649402
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Tue Feb 1 10:50:17 2022 -0800
Timeout if waiting server channel lock takes a long time (#8083)
- make requests be able to time out early when "waiting the channel lock".
- move the request serialization logic out of critical section.
---
.../apache/pinot/core/transport/QueryRouter.java | 3 +-
.../pinot/core/transport/ServerChannels.java | 39 ++++++++++++++++------
2 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index c49ed54..40c32c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -119,7 +119,8 @@ public class QueryRouter {
ServerRoutingInstance serverRoutingInstance = entry.getKey();
ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels;
try {
- serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue());
+ serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(),
+ timeoutMs);
asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
} catch (Exception e) {
LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 8f70009..e5b422a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -33,6 +33,8 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -53,6 +55,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
public class ServerChannels {
private final QueryRouter _queryRouter;
private final BrokerMetrics _brokerMetrics;
+ private final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory());
private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel> _serverToChannelMap = new ConcurrentHashMap<>();
private final EventLoopGroup _eventLoopGroup = new NioEventLoopGroup();
private final TlsConfig _tlsConfig;
@@ -81,10 +84,11 @@ public class ServerChannels {
}
public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
- ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest)
+ ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, long timeoutMs)
throws Exception {
+ byte[] requestBytes = _serializer.serialize(instanceRequest);
_serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new)
- .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, instanceRequest);
+ .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes, timeoutMs);
}
public void shutDown() {
@@ -94,9 +98,10 @@ public class ServerChannels {
@ThreadSafe
private class ServerChannel {
- final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory());
final ServerRoutingInstance _serverRoutingInstance;
final Bootstrap _bootstrap;
+ // lock to protect channel as requests must be written into channel sequentially
+ final ReentrantLock _channelLock = new ReentrantLock();
Channel _channel;
ServerChannel(ServerRoutingInstance serverRoutingInstance) {
@@ -122,8 +127,8 @@ public class ServerChannels {
private void attachSSLHandler(SocketChannel ch) {
try {
- SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
- .sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
+ SslContextBuilder sslContextBuilder =
+ SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
if (_tlsConfig.getKeyStorePath() != null) {
sslContextBuilder.keyManager(TlsUtils.createKeyManagerFactory(_tlsConfig));
@@ -139,8 +144,22 @@ public class ServerChannels {
}
}
- synchronized void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
- ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest)
+ private void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
+ ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long timeoutMs)
+ throws Exception {
+ if (_channelLock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
+ try {
+ sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes);
+ } finally {
+ _channelLock.unlock();
+ }
+ } else {
+ throw new TimeoutException("Timeout while acquiring channel lock");
+ }
+ }
+
+ private void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
+ ServerRoutingInstance serverRoutingInstance, byte[] requestBytes)
throws Exception {
if (_channel == null || !_channel.isActive()) {
long startTime = System.currentTimeMillis();
@@ -148,13 +167,11 @@ public class ServerChannels {
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.NETTY_CONNECTION_CONNECT_TIME_MS,
System.currentTimeMillis() - startTime);
}
- byte[] requestBytes = _serializer.serialize(instanceRequest);
long sendRequestStartTimeMs = System.currentTimeMillis();
_channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> {
long requestSentLatencyMs = System.currentTimeMillis() - sendRequestStartTimeMs;
- _brokerMetrics
- .addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY, requestSentLatencyMs,
- TimeUnit.MILLISECONDS);
+ _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
+ requestSentLatencyMs, TimeUnit.MILLISECONDS);
asyncQueryResponse.markRequestSent(serverRoutingInstance, requestSentLatencyMs);
});
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org