You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/02/01 18:50:52 UTC

[pinot] branch master updated: Timeout if waiting server channel lock takes a long time (#8083)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 86fe41e  Timeout if waiting server channel lock takes a long time (#8083)
86fe41e is described below

commit 86fe41e2e9c17090f1fa9bcc39dbd61e3b649402
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Tue Feb 1 10:50:17 2022 -0800

    Timeout if waiting server channel lock takes a long time (#8083)
    
    - make requests be able to time out early when "waiting the channel lock".
    - move the request serialization logic out of critical section.
---
 .../apache/pinot/core/transport/QueryRouter.java   |  3 +-
 .../pinot/core/transport/ServerChannels.java       | 39 ++++++++++++++++------
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index c49ed54..40c32c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -119,7 +119,8 @@ public class QueryRouter {
       ServerRoutingInstance serverRoutingInstance = entry.getKey();
       ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels;
       try {
-        serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue());
+        serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(),
+            timeoutMs);
         asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
       } catch (Exception e) {
         LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 8f70009..e5b422a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -33,6 +33,8 @@ import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -53,6 +55,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
 public class ServerChannels {
   private final QueryRouter _queryRouter;
   private final BrokerMetrics _brokerMetrics;
+  private final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory());
   private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel> _serverToChannelMap = new ConcurrentHashMap<>();
   private final EventLoopGroup _eventLoopGroup = new NioEventLoopGroup();
   private final TlsConfig _tlsConfig;
@@ -81,10 +84,11 @@ public class ServerChannels {
   }
 
   public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
-      ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest)
+      ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, long timeoutMs)
       throws Exception {
+    byte[] requestBytes = _serializer.serialize(instanceRequest);
     _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new)
-        .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, instanceRequest);
+        .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes, timeoutMs);
   }
 
   public void shutDown() {
@@ -94,9 +98,10 @@ public class ServerChannels {
 
   @ThreadSafe
   private class ServerChannel {
-    final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory());
     final ServerRoutingInstance _serverRoutingInstance;
     final Bootstrap _bootstrap;
+    // lock to protect channel as requests must be written into channel sequentially
+    final ReentrantLock _channelLock = new ReentrantLock();
     Channel _channel;
 
     ServerChannel(ServerRoutingInstance serverRoutingInstance) {
@@ -122,8 +127,8 @@ public class ServerChannels {
 
     private void attachSSLHandler(SocketChannel ch) {
       try {
-        SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
-            .sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
+        SslContextBuilder sslContextBuilder =
+            SslContextBuilder.forClient().sslProvider(SslProvider.valueOf(_tlsConfig.getSslProvider()));
 
         if (_tlsConfig.getKeyStorePath() != null) {
           sslContextBuilder.keyManager(TlsUtils.createKeyManagerFactory(_tlsConfig));
@@ -139,8 +144,22 @@ public class ServerChannels {
       }
     }
 
-    synchronized void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
-        ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest)
+    private void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
+        ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long timeoutMs)
+        throws Exception {
+      if (_channelLock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
+        try {
+          sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes);
+        } finally {
+          _channelLock.unlock();
+        }
+      } else {
+        throw new TimeoutException("Timeout while acquiring channel lock");
+      }
+    }
+
+    private void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
+        ServerRoutingInstance serverRoutingInstance, byte[] requestBytes)
         throws Exception {
       if (_channel == null || !_channel.isActive()) {
         long startTime = System.currentTimeMillis();
@@ -148,13 +167,11 @@ public class ServerChannels {
         _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.NETTY_CONNECTION_CONNECT_TIME_MS,
             System.currentTimeMillis() - startTime);
       }
-      byte[] requestBytes = _serializer.serialize(instanceRequest);
       long sendRequestStartTimeMs = System.currentTimeMillis();
       _channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> {
         long requestSentLatencyMs = System.currentTimeMillis() - sendRequestStartTimeMs;
-        _brokerMetrics
-            .addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY, requestSentLatencyMs,
-                TimeUnit.MILLISECONDS);
+        _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
+            requestSentLatencyMs, TimeUnit.MILLISECONDS);
         asyncQueryResponse.markRequestSent(serverRoutingInstance, requestSentLatencyMs);
       });
       _brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT, 1);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org