You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/06/18 21:19:46 UTC
[pinot] branch master updated: properly shutdown query client with await termination (#8922)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e159846ecc properly shutdown query client with await termination (#8922)
e159846ecc is described below
commit e159846ecc9117ac93e0b655cb03f0dc8f0a4ba6
Author: Rong Rong <ro...@apache.org>
AuthorDate: Sat Jun 18 14:19:40 2022 -0700
properly shutdown query client with await termination (#8922)
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../broker/requesthandler/GrpcBrokerRequestHandler.java | 7 +++++++
.../apache/pinot/common/utils/grpc/GrpcQueryClient.java | 15 ++++++++++++++-
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 0bd33a9289..0b3744f018 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -78,6 +78,7 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
@Override
public synchronized void shutDown() {
+ _streamingQueryClient.shutdown();
_streamingReduceService.shutDown();
}
@@ -146,5 +147,11 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
String key = String.format("%s_%d", host, port);
return _grpcQueryClientMap.computeIfAbsent(key, k -> new GrpcQueryClient(host, port, _config));
}
+
+ public void shutdown() {
+ for (GrpcQueryClient client : _grpcQueryClientMap.values()) {
+ client.close();
+ }
+ }
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index bbb70d85e9..df074d2581 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -27,6 +27,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
@@ -35,9 +36,14 @@ import org.apache.pinot.common.proto.PinotQueryServerGrpc;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GrpcQueryClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryClient.class);
+ private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10;
+
private final ManagedChannel _managedChannel;
private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub;
@@ -83,7 +89,14 @@ public class GrpcQueryClient {
public void close() {
if (!_managedChannel.isShutdown()) {
- _managedChannel.shutdownNow();
+ try {
+ _managedChannel.shutdownNow();
+ if (!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND, TimeUnit.SECONDS)) {
+ LOGGER.warn("Timed out forcefully shutting down connection: {}. ", _managedChannel);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unexpected exception while waiting for channel termination", e);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org