You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:56 UTC

[pulsar] 02/06: [Broker] Add timeout to closing CoordinationServiceImpl (#15777)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 774e22c2f4726709f2cb0246054baae5a8218d59
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue May 31 17:12:54 2022 +0300

    [Broker] Add timeout to closing CoordinationServiceImpl (#15777)
    
    Fixes #15774
    
    Also close the executor that wasn't closed
    
    (cherry picked from commit 1266f913678804d4cb7f2142458e87d6155f8bd4)
---
 .../pulsar/metadata/coordination/impl/CoordinationServiceImpl.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index 4bda85c64fb..2b7e38b6c44 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata.coordination.impl;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -43,6 +45,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 @SuppressWarnings("unchecked")
 public class CoordinationServiceImpl implements CoordinationService {
 
+    private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
     private final MetadataStoreExtended store;
 
     private final Map<Object, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
@@ -69,9 +72,11 @@ public class CoordinationServiceImpl implements CoordinationService {
                 futures.add(lm.asyncClose());
             }
 
-            FutureUtils.collect(futures).join();
+            FutureUtils.collect(futures).get(CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
         } catch (CompletionException ce) {
             throw MetadataStoreException.unwrap(ce);
+        } finally {
+            executor.shutdownNow();
         }
     }