You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2023/10/11 16:22:53 UTC
[pulsar] branch master updated: [improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aecdb03e0e6 [improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282)
aecdb03e0e6 is described below
commit aecdb03e0e64605d60f03d9b76f99c1136677dff
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Oct 11 09:22:44 2023 -0700
[improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282)
---
.../channel/ServiceUnitStateChannelImpl.java | 90 +++++++++++++---------
.../channel/ServiceUnitStateChannelTest.java | 10 +--
2 files changed, 59 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index d71513652e9..f7e09a2bec5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -54,6 +54,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
@@ -67,6 +68,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -97,7 +99,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -125,9 +126,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
- private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests;
+ private final Map<String, CompletableFuture<String>> getOwnerRequests;
private final String lookupServiceAddress;
- private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs;
+ private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;
private ExtensibleLoadManagerImpl loadManager;
private BrokerRegistry brokerRegistry;
@@ -204,9 +205,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
this.config = pulsar.getConfig();
this.lookupServiceAddress = pulsar.getLookupServiceAddress();
this.schema = Schema.JSON(ServiceUnitStateData.class);
- this.getOwnerRequests = ConcurrentOpenHashMap.<String,
- CompletableFuture<String>>newBuilder().build();
- this.cleanupJobs = ConcurrentOpenHashMap.<String, CompletableFuture<Void>>newBuilder().build();
+ this.getOwnerRequests = new ConcurrentHashMap<>();
+ this.cleanupJobs = new ConcurrentHashMap<>();
this.stateChangeListeners = new StateChangeListeners();
this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
@@ -826,20 +826,28 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
- return getOwnerRequests
- .computeIfAbsent(serviceUnit, k -> {
- CompletableFuture<String> future = new CompletableFuture<>();
- future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
- .whenComplete((v, e) -> {
- if (e != null) {
- getOwnerRequests.remove(serviceUnit, future);
- log.warn("Failed to getOwner for serviceUnit:{}",
- serviceUnit, e);
- }
+ var requested = new MutableObject<CompletableFuture<String>>();
+ try {
+ return getOwnerRequests
+ .computeIfAbsent(serviceUnit, k -> {
+ CompletableFuture<String> future = new CompletableFuture<>();
+ requested.setValue(future);
+ return future;
+ });
+ } finally {
+ var future = requested.getValue();
+ if (future != null) {
+ future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ getOwnerRequests.remove(serviceUnit, future);
+ log.warn("Failed to getOwner for serviceUnit:{}",
+ serviceUnit, e);
}
- );
- return future;
- });
+ }
+ );
+ }
+ }
}
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
@@ -1114,24 +1122,34 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
private void scheduleCleanup(String broker, long delayInSecs) {
- cleanupJobs.computeIfAbsent(broker, k -> {
- Executor delayed = CompletableFuture
- .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
- totalInactiveBrokerCleanupScheduledCnt++;
- return CompletableFuture
- .runAsync(() -> {
- try {
- doCleanup(broker);
- } catch (Throwable e) {
- log.error("Failed to run the cleanup job for the broker {}, "
- + "totalCleanupErrorCnt:{}.",
- broker, totalCleanupErrorCnt.incrementAndGet(), e);
- } finally {
- cleanupJobs.remove(broker);
+ var scheduled = new MutableObject<CompletableFuture<Void>>();
+ try {
+ cleanupJobs.computeIfAbsent(broker, k -> {
+ Executor delayed = CompletableFuture
+ .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
+ totalInactiveBrokerCleanupScheduledCnt++;
+ var future = CompletableFuture
+ .runAsync(() -> {
+ try {
+ doCleanup(broker);
+ } catch (Throwable e) {
+ log.error("Failed to run the cleanup job for the broker {}, "
+ + "totalCleanupErrorCnt:{}.",
+ broker, totalCleanupErrorCnt.incrementAndGet(), e);
+ }
}
- }
- , delayed);
- });
+ , delayed);
+ scheduled.setValue(future);
+ return future;
+ });
+ } finally {
+ var future = scheduled.getValue();
+ if (future != null) {
+ future.whenComplete((v, ex) -> {
+ cleanupJobs.remove(broker);
+ });
+ }
+ }
log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.",
broker, delayInSecs, cleanupJobs.size());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index a226df53e12..f9893ea3f63 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -88,7 +89,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -1558,9 +1558,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
}
- private static ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
+ private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
- return (ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>>)
+ return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>)
FieldUtils.readDeclaredField(channel,
"getOwnerRequests", true);
}
@@ -1577,9 +1577,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
FieldUtils.readField(channel, "lastMetadataSessionEventTimestamp", true);
}
- private static ConcurrentOpenHashMap<String, CompletableFuture<Void>> getCleanupJobs(
+ private static ConcurrentHashMap<String, CompletableFuture<Void>> getCleanupJobs(
ServiceUnitStateChannel channel) throws IllegalAccessException {
- return (ConcurrentOpenHashMap<String, CompletableFuture<Void>>)
+ return (ConcurrentHashMap<String, CompletableFuture<Void>>)
FieldUtils.readField(channel, "cleanupJobs", true);
}