You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:56 UTC
[pulsar] 02/06: [Broker] Add timeout to closing CoordinationServiceImpl (#15777)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 774e22c2f4726709f2cb0246054baae5a8218d59
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue May 31 17:12:54 2022 +0300
[Broker] Add timeout to closing CoordinationServiceImpl (#15777)
Fixes #15774
Also close the executor that wasn't closed
(cherry picked from commit 1266f913678804d4cb7f2142458e87d6155f8bd4)
---
.../pulsar/metadata/coordination/impl/CoordinationServiceImpl.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index 4bda85c64fb..2b7e38b6c44 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.metadata.coordination.impl;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -29,6 +30,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -43,6 +45,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@SuppressWarnings("unchecked")
public class CoordinationServiceImpl implements CoordinationService {
+ private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
private final MetadataStoreExtended store;
private final Map<Object, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
@@ -69,9 +72,11 @@ public class CoordinationServiceImpl implements CoordinationService {
futures.add(lm.asyncClose());
}
- FutureUtils.collect(futures).join();
+ FutureUtils.collect(futures).get(CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
} catch (CompletionException ce) {
throw MetadataStoreException.unwrap(ce);
+ } finally {
+ executor.shutdownNow();
}
}