You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/03 23:03:24 UTC

[pulsar] branch master updated: [Client] Continue shutdown of client when closing producers and consumers timeout (#15921)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d73fce28c84 [Client] Continue shutdown of client when closing producers and consumers timeout (#15921)
d73fce28c84 is described below

commit d73fce28c84559bc956adbf223cdc39d41c6bfc2
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 4 02:03:16 2022 +0300

    [Client] Continue shutdown of client when closing producers and consumers timeout (#15921)
    
    - don't return exception to caller
---
 .../pulsar/client/impl/PulsarClientImpl.java       | 23 +++++++++++-----------
 1 file changed, 11 insertions(+), 12 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b6f4050b452..8a7d50f0d4b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -750,24 +750,23 @@ public class PulsarClientImpl implements PulsarClient {
         FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
                 shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
                         PulsarClientImpl.class, "closeAsync"));
-        combinedFuture.whenComplete((__, t) -> new Thread(() -> {
+        combinedFuture.handle((__, t) -> {
             if (t != null) {
                 log.error("Closing producers and consumers failed. Continuing with shutdown.", t);
             }
-            shutdownExecutor.shutdownNow();
-            // All producers & consumers are now closed, we can stop the client safely
-            try {
-                shutdown();
-                closeFuture.complete(null);
+            new Thread(() -> {
+                shutdownExecutor.shutdownNow();
+                // All producers & consumers are now closed, we can stop the client safely
+                try {
+                    shutdown();
+                } catch (PulsarClientException e) {
+                    log.error("Shutdown failed. Ignoring the exception.", e);
+                }
                 state.set(State.Closed);
-            } catch (PulsarClientException e) {
-                closeFuture.completeExceptionally(e);
-            }
-        }, "pulsar-client-shutdown-thread").start()).exceptionally(exception -> {
-            closeFuture.completeExceptionally(exception);
+                closeFuture.complete(null);
+            }, "pulsar-client-shutdown-thread").start();
             return null;
         });
-
         return closeFuture;
     }