You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/02/03 02:03:26 UTC
[pinot] branch master updated: Add a broker metric to distinguish exception happens when acquire channel lock or when send request to server (#8105)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8b26df6 Add a broker metric to distinguish exception happens when acquire channel lock or when send request to server (#8105)
8b26df6 is described below
commit 8b26df6b758cde4445e8bf3682a325226fd020a9
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Wed Feb 2 18:02:59 2022 -0800
Add a broker metric to distinguish exception happens when acquire channel lock or when send request to server (#8105)
* Add a broker metric to distinguish exception happens when acquire channel lock or send request to server
* don't use reflection in catch block
* compare timeut message
---
.../apache/pinot/common/metrics/BrokerMeter.java | 1 +
.../apache/pinot/core/transport/QueryRouter.java | 24 ++++++++++++++++------
.../pinot/core/transport/ServerChannels.java | 3 ++-
3 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 0d505cf..67dbb37 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -43,6 +43,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// Scatter phase.
NO_SERVER_FOUND_EXCEPTIONS("exceptions", false),
REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS("exceptions", false),
+ REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS("exceptions", false),
REQUEST_SEND_EXCEPTIONS("exceptions", false),
// Gather phase.
RESPONSE_FETCH_EXCEPTIONS("exceptions", false),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 40c32c8..db08d96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -119,15 +120,18 @@ public class QueryRouter {
ServerRoutingInstance serverRoutingInstance = entry.getKey();
ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels;
try {
- serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(),
- timeoutMs);
+ serverChannels
+ .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(), timeoutMs);
asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
+ } catch (TimeoutException e) {
+ if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) {
+ _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1);
+ }
+ markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e);
+ break;
} catch (Exception e) {
- LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
- serverRoutingInstance, e);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1);
- asyncQueryResponse.setBrokerRequestSendException(e);
- asyncQueryResponse.markQueryFailed();
+ markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e);
break;
}
}
@@ -135,6 +139,14 @@ public class QueryRouter {
return asyncQueryResponse;
}
+ private void markQueryFailed(long requestId, ServerRoutingInstance serverRoutingInstance,
+ AsyncQueryResponse asyncQueryResponse, Exception e) {
+ LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
+ serverRoutingInstance, e);
+ asyncQueryResponse.setBrokerRequestSendException(e);
+ asyncQueryResponse.markQueryFailed();
+ }
+
public void shutDown() {
_serverChannels.shutDown();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index e5b422a..2e6a77c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -53,6 +53,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
*/
@ThreadSafe
public class ServerChannels {
+ public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while acquiring channel lock";
private final QueryRouter _queryRouter;
private final BrokerMetrics _brokerMetrics;
private final TSerializer _serializer = new TSerializer(new TCompactProtocol.Factory());
@@ -154,7 +155,7 @@ public class ServerChannels {
_channelLock.unlock();
}
} else {
- throw new TimeoutException("Timeout while acquiring channel lock");
+ throw new TimeoutException(CHANNEL_LOCK_TIMEOUT_MSG);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org