You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/03 23:03:24 UTC
[pulsar] branch master updated: [Client] Continue shutdown of client when closing producers and consumers timeout (#15921)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d73fce28c84 [Client] Continue shutdown of client when closing producers and consumers timeout (#15921)
d73fce28c84 is described below
commit d73fce28c84559bc956adbf223cdc39d41c6bfc2
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 4 02:03:16 2022 +0300
[Client] Continue shutdown of client when closing producers and consumers timeout (#15921)
- don't return exception to caller
---
.../pulsar/client/impl/PulsarClientImpl.java | 23 +++++++++++-----------
1 file changed, 11 insertions(+), 12 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b6f4050b452..8a7d50f0d4b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -750,24 +750,23 @@ public class PulsarClientImpl implements PulsarClient {
FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
PulsarClientImpl.class, "closeAsync"));
- combinedFuture.whenComplete((__, t) -> new Thread(() -> {
+ combinedFuture.handle((__, t) -> {
if (t != null) {
log.error("Closing producers and consumers failed. Continuing with shutdown.", t);
}
- shutdownExecutor.shutdownNow();
- // All producers & consumers are now closed, we can stop the client safely
- try {
- shutdown();
- closeFuture.complete(null);
+ new Thread(() -> {
+ shutdownExecutor.shutdownNow();
+ // All producers & consumers are now closed, we can stop the client safely
+ try {
+ shutdown();
+ } catch (PulsarClientException e) {
+ log.error("Shutdown failed. Ignoring the exception.", e);
+ }
state.set(State.Closed);
- } catch (PulsarClientException e) {
- closeFuture.completeExceptionally(e);
- }
- }, "pulsar-client-shutdown-thread").start()).exceptionally(exception -> {
- closeFuture.completeExceptionally(exception);
+ closeFuture.complete(null);
+ }, "pulsar-client-shutdown-thread").start();
return null;
});
-
return closeFuture;
}