You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2023/10/11 16:22:53 UTC

[pulsar] branch master updated: [improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aecdb03e0e6 [improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282)
aecdb03e0e6 is described below

commit aecdb03e0e64605d60f03d9b76f99c1136677dff
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Oct 11 09:22:44 2023 -0700

    [improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282)
---
 .../channel/ServiceUnitStateChannelImpl.java       | 90 +++++++++++++---------
 .../channel/ServiceUnitStateChannelTest.java       | 10 +--
 2 files changed, 59 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index d71513652e9..f7e09a2bec5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -54,6 +54,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
@@ -67,6 +68,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.pulsar.PulsarClusterMetadataSetup;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -97,7 +99,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -125,9 +126,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     private final PulsarService pulsar;
     private final ServiceConfiguration config;
     private final Schema<ServiceUnitStateData> schema;
-    private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests;
+    private final Map<String, CompletableFuture<String>> getOwnerRequests;
     private final String lookupServiceAddress;
-    private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs;
+    private final Map<String, CompletableFuture<Void>> cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
     private ExtensibleLoadManagerImpl loadManager;
     private BrokerRegistry brokerRegistry;
@@ -204,9 +205,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         this.config = pulsar.getConfig();
         this.lookupServiceAddress = pulsar.getLookupServiceAddress();
         this.schema = Schema.JSON(ServiceUnitStateData.class);
-        this.getOwnerRequests = ConcurrentOpenHashMap.<String,
-                CompletableFuture<String>>newBuilder().build();
-        this.cleanupJobs = ConcurrentOpenHashMap.<String, CompletableFuture<Void>>newBuilder().build();
+        this.getOwnerRequests = new ConcurrentHashMap<>();
+        this.cleanupJobs = new ConcurrentHashMap<>();
         this.stateChangeListeners = new StateChangeListeners();
         this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
                 * 1000;
@@ -826,20 +826,28 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     }
 
     private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
-        return getOwnerRequests
-                .computeIfAbsent(serviceUnit, k -> {
-                    CompletableFuture<String> future = new CompletableFuture<>();
-                    future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
-                            .whenComplete((v, e) -> {
-                                        if (e != null) {
-                                            getOwnerRequests.remove(serviceUnit, future);
-                                            log.warn("Failed to getOwner for serviceUnit:{}",
-                                                    serviceUnit, e);
-                                        }
+        var requested = new MutableObject<CompletableFuture<String>>();
+        try {
+            return getOwnerRequests
+                    .computeIfAbsent(serviceUnit, k -> {
+                        CompletableFuture<String> future = new CompletableFuture<>();
+                        requested.setValue(future);
+                        return future;
+                    });
+        } finally {
+            var future = requested.getValue();
+            if (future != null) {
+                future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
+                        .whenComplete((v, e) -> {
+                                    if (e != null) {
+                                        getOwnerRequests.remove(serviceUnit, future);
+                                        log.warn("Failed to getOwner for serviceUnit:{}",
+                                                serviceUnit, e);
                                     }
-                            );
-                    return future;
-                });
+                                }
+                        );
+            }
+        }
     }
 
     private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
@@ -1114,24 +1122,34 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     }
 
     private void scheduleCleanup(String broker, long delayInSecs) {
-        cleanupJobs.computeIfAbsent(broker, k -> {
-            Executor delayed = CompletableFuture
-                    .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
-            totalInactiveBrokerCleanupScheduledCnt++;
-            return CompletableFuture
-                    .runAsync(() -> {
-                                try {
-                                    doCleanup(broker);
-                                } catch (Throwable e) {
-                                    log.error("Failed to run the cleanup job for the broker {}, "
-                                                    + "totalCleanupErrorCnt:{}.",
-                                            broker, totalCleanupErrorCnt.incrementAndGet(), e);
-                                } finally {
-                                    cleanupJobs.remove(broker);
+        var scheduled = new MutableObject<CompletableFuture<Void>>();
+        try {
+            cleanupJobs.computeIfAbsent(broker, k -> {
+                Executor delayed = CompletableFuture
+                        .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
+                totalInactiveBrokerCleanupScheduledCnt++;
+                var future = CompletableFuture
+                        .runAsync(() -> {
+                                    try {
+                                        doCleanup(broker);
+                                    } catch (Throwable e) {
+                                        log.error("Failed to run the cleanup job for the broker {}, "
+                                                        + "totalCleanupErrorCnt:{}.",
+                                                broker, totalCleanupErrorCnt.incrementAndGet(), e);
+                                    }
                                 }
-                            }
-                            , delayed);
-        });
+                                , delayed);
+                scheduled.setValue(future);
+                return future;
+            });
+        } finally {
+            var future = scheduled.getValue();
+            if (future != null) {
+                future.whenComplete((v, ex) -> {
+                    cleanupJobs.remove(broker);
+                });
+            }
+        }
 
         log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.",
                 broker, delayInSecs, cleanupJobs.size());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index a226df53e12..f9893ea3f63 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -88,7 +89,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -1558,9 +1558,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
     }
 
 
-    private static ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
+    private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
-        return (ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>>)
+        return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>)
                 FieldUtils.readDeclaredField(channel,
                         "getOwnerRequests", true);
     }
@@ -1577,9 +1577,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
                 FieldUtils.readField(channel, "lastMetadataSessionEventTimestamp", true);
     }
 
-    private static ConcurrentOpenHashMap<String, CompletableFuture<Void>> getCleanupJobs(
+    private static ConcurrentHashMap<String, CompletableFuture<Void>> getCleanupJobs(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
-        return (ConcurrentOpenHashMap<String, CompletableFuture<Void>>)
+        return (ConcurrentHashMap<String, CompletableFuture<Void>>)
                 FieldUtils.readField(channel, "cleanupJobs", true);
     }