You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/02 11:52:04 UTC

[GitHub] [pulsar] lhotari opened a new pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

lhotari opened a new pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538


   Fixes #13964
   
   ### Motivation
   
   Closing a Pulsar client gets blocked or leaves unclosed resources behind in CI. 
   This causes builds to timeout. Similar problems could happen in production code that closing the client blocks or leaves resources behind if the closing of a producer or a consumer completes with an exception.
   
   see #13964 for issue report about closing getting blocked.
   
   Examples:
   
   PulsarClientImpl.shutdownEventLoopGroup stuck:
   
   [thread dump from stalled build job](https://github.com/apache/pulsar/runs/5389016985?check_suite_focus=true#step:12:1341)
   ```
   "main" #1 prio=5 os_prio=0 cpu=49933.58ms elapsed=347.19s tid=0x00007f86b4028000 nid=0x1f5ee in Object.wait()  [0x00007f86bbfb2000]
      java.lang.Thread.State: WAITING (on object monitor)
           at java.lang.Object.wait(java.base@11.0.14.1/Native Method)
           - waiting on <no object reference available>
           at java.lang.Object.wait(java.base@11.0.14.1/Object.java:328)
           at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:253)
           - waiting to re-lock in wait() <0x00000000d4c60d18> (a io.netty.util.concurrent.DefaultPromise)
           at io.netty.util.concurrent.DefaultPromise.get(DefaultPromise.java:337)
           at org.apache.pulsar.client.impl.PulsarClientImpl.shutdownEventLoopGroup(PulsarClientImpl.java:821)
           at org.apache.pulsar.client.impl.PulsarClientImpl.shutdown(PulsarClientImpl.java:767)
           at org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.internalCleanup(MockedPulsarServiceBaseTest.java:203)
           at org.apache.pulsar.broker.intercept.BrokerInterceptorTest.teardown(BrokerInterceptorTest.java:99)
   ```
   
   
   ### Modifications
   
   - don't terminate the closing sequence when exceptions occur in closing producers and consumers 
     - log exceptions when closing producers and consumers
   - add timeout handling for waiting for the closing of producers and consumers to complete
   - add timeout for shutting down eventLoopGroup


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817636144



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(

Review comment:
       I don't think it's a problem from performance perspective in any way. I admit that it's ugly. 
   The other thread pre-existed and it's needed to prevent deadlocks in certain cases. 
   
   Adding timeout handling for futures requires a ScheduledExecutorService reference. Since this is part of shutdown, other executors will be shutdown so that's why need a new instance. 
   
   btw. Timeout handling for Futures exists in the JDK since Java 9 with [CompletableFuture#completeOnTimeout](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeOnTimeout(T,long,java.util.concurrent.TimeUnit)). Since we must support Java 8, we need to have our own solution for CompletableFuture timeout handling. In Java 9+ , there's an internal class called java.util.concurrent.CompletableFuture.Delayer which contains the since threaded executor that the JDK uses for completeOnTimeout.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817630255



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-client-shutdown-timeout-scheduler"));
+        FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
+                shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
+                        PulsarClientImpl.class, "closeAsync"));
+        combinedFuture.whenComplete((__, t) -> new Thread(() -> {

Review comment:
       it does have a name. this thread existed already before this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817636144



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(

Review comment:
       I don't think it's a problem from performance perspective in any way. I admit that it's ugly. 
   The other thread pre-existed and it's needed to prevent deadlocks in certain cases. 
   
   Adding timeout handling for futures requires a ScheduledExecutorService reference. Since this is part of shutdown, other executors will be shutdown so that's why it needs a new instance. 
   
   btw. Timeout handling for Futures exists in the JDK since Java 9 with [CompletableFuture#completeOnTimeout](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeOnTimeout(T,long,java.util.concurrent.TimeUnit)). Since we must support Java 8, we need to have our own solution for CompletableFuture timeout handling. In Java 9+ , there's an internal class called java.util.concurrent.CompletableFuture.Delayer which contains the single threaded executor that the JDK uses for completeOnTimeout.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817636144



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(

Review comment:
       I don't think it's a problem from performance perspective in any way. I admit that it's ugly. 
   The other thread pre-existed and it's needed to prevent deadlocks in certain cases. 
   
   Adding timeout handling for futures requires a ScheduledExecutorService reference. Since this is part of shutdown, other executors will be shutdown so that's why it needs a new instance. 
   
   btw. Timeout handling for Futures exists in the JDK since Java 9 with [CompletableFuture#completeOnTimeout](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeOnTimeout(T,long,java.util.concurrent.TimeUnit)). Since we must support Java 8, we need to have our own solution for CompletableFuture timeout handling. In Java 9+ , there's an internal class called java.util.concurrent.CompletableFuture.Delayer which contains the since threaded executor that the JDK uses for completeOnTimeout.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817629857



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);

Review comment:
       thanks, I'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari merged pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari merged pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817619359



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);

Review comment:
       this is an error only if "t" is not null

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);

Review comment:
       this is an error only if "t" is not null

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-client-shutdown-timeout-scheduler"));
+        FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
+                shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
+                        PulsarClientImpl.class, "closeAsync"));
+        combinedFuture.whenComplete((__, t) -> new Thread(() -> {

Review comment:
       shall we give a name to this Thread and also set it as "deamon" ?
   
   

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(

Review comment:
       is it is too heavy weight to create 2 threads (one here and one below) every time we "close" a PulsarClient ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817636144



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);
+            return null;
+        })));
 
         // Need to run the shutdown sequence in a separate thread to prevent deadlocks
         // If there are consumers or producers that need to be shutdown we cannot use the same thread
         // to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
         // would happen
-        FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
+        CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(

Review comment:
       I don't think it's a problem from performance perspective in any way. I admin that it's ugly. 
   The other thread pre-existed and it's needed to prevent deadlocks in certain cases. 
   
   Adding timeout handling for futures requires a ScheduledExecutorService reference. Since this is part of shutdown, other executors will be shutdown so that's why need a new instance. 
   
   btw. Timeout handling for Futures exists in the JDK since Java 9 with [CompletableFuture#completeOnTimeout](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#completeOnTimeout(T,long,java.util.concurrent.TimeUnit)). Since we must support Java 8, we need to have our own solution for CompletableFuture timeout handling. In Java 9+ , there's an internal class called java.util.concurrent.CompletableFuture.Delayer which contains the since threaded executor that the JDK uses for completeOnTimeout.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #14538: [Java client] Add timeout to closeAsync so that closing a client doesn't block forever

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #14538:
URL: https://github.com/apache/pulsar/pull/14538#discussion_r817637544



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);

Review comment:
       @eolivelli fixed now. PTAL

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -710,14 +713,30 @@ public void close() throws PulsarClientException {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        producers.forEach(p -> futures.add(p.closeAsync()));
-        consumers.forEach(c -> futures.add(c.closeAsync()));
+        producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
+            log.error("Error closing producer {}", p, t);
+            return null;
+        })));
+        consumers.forEach(c -> futures.add(c.closeAsync().handle((__, t) -> {
+            log.error("Error closing consumer {}", c, t);

Review comment:
       @eolivelli fixed now. PTAL




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org