You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/02/03 02:03:26 UTC

[pinot] branch master updated: Add a broker metric to distinguish exception happens when acquire channel lock or when send request to server (#8105)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b26df6  Add a broker metric to distinguish exception happens when acquire channel lock or when send request to server (#8105)
8b26df6 is described below

commit 8b26df6b758cde4445e8bf3682a325226fd020a9
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Wed Feb 2 18:02:59 2022 -0800

    Add a broker metric to distinguish exception happens when acquire channel lock or when send request to server (#8105)
    
    * Add a broker metric to distinguish exception happens when acquire channel lock or send request to server
    
    * don't use reflection in catch block
    
    * compare timeut message
---
 .../apache/pinot/common/metrics/BrokerMeter.java   |  1 +
 .../apache/pinot/core/transport/QueryRouter.java   | 24 ++++++++++++++++------
 .../pinot/core/transport/ServerChannels.java       |  3 ++-
 3 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 0d505cf..67dbb37 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -43,6 +43,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   // Scatter phase.
   NO_SERVER_FOUND_EXCEPTIONS("exceptions", false),
   REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS("exceptions", false),
+  REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS("exceptions", false),
   REQUEST_SEND_EXCEPTIONS("exceptions", false),
   // Gather phase.
   RESPONSE_FETCH_EXCEPTIONS("exceptions", false),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 40c32c8..db08d96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -119,15 +120,18 @@ public class QueryRouter {
       ServerRoutingInstance serverRoutingInstance = entry.getKey();
       ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels;
       try {
-        serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(),
-            timeoutMs);
+        serverChannels
+            .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(), timeoutMs);
         asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
+      } catch (TimeoutException e) {
+        if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) {
+          _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1);
+        }
+        markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e);
+        break;
       } catch (Exception e) {
-        LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
-            serverRoutingInstance, e);
         _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1);
-        asyncQueryResponse.setBrokerRequestSendException(e);
-        asyncQueryResponse.markQueryFailed();
+        markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e);
         break;
       }
     }
@@ -135,6 +139,14 @@ public class QueryRouter {
     return asyncQueryResponse;
   }
 
+  private void markQueryFailed(long requestId, ServerRoutingInstance serverRoutingInstance,
+      AsyncQueryResponse asyncQueryResponse, Exception e) {
+    LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
+        serverRoutingInstance, e);
+    asyncQueryResponse.setBrokerRequestSendException(e);
+    asyncQueryResponse.markQueryFailed();
+  }
+
   public void shutDown() {
     _serverChannels.shutDown();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index e5b422a..2e6a77c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -53,6 +53,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
  */
 @ThreadSafe
 public class ServerChannels {
+  public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while acquiring channel lock";
   private final QueryRouter _queryRouter;
   private final BrokerMetrics _brokerMetrics;
   private final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory());
@@ -154,7 +155,7 @@ public class ServerChannels {
           _channelLock.unlock();
         }
       } else {
-        throw new TimeoutException("Timeout while acquiring channel lock");
+        throw new TimeoutException(CHANNEL_LOCK_TIMEOUT_MSG);
       }
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org