You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/25 05:23:43 UTC

[pulsar] branch master updated: Avoid making copies of internal maps when iterating (#10691)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed2dfc9  Avoid making copies of internal maps when iterating (#10691)
ed2dfc9 is described below

commit ed2dfc91226cf0d663cbeea052e923a125812fca
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 24 22:23:04 2021 -0700

    Avoid making copies of internal maps when iterating (#10691)
    
    ### Motivation
    
    In several places in the code when iterating over the custom hashmaps, we are taking over a copy of the map. This was done every time the iteration could end up modifying the map, since there was a non-reentrant mutex taken during the iteration. Any modification would lead to a deadlock.
    
    Since the behavior was changed in #9787 to not hold the section mutex during the iteration, there's no more need to make a copy of the maps.
---
 .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java     | 5 ++---
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++--
 .../org/apache/pulsar/broker/service/BacklogQuotaManager.java     | 8 ++++----
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++--
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 4 ++--
 5 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index abac85b..8c80e38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -46,7 +46,6 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.Message;
@@ -313,13 +312,13 @@ public class BrokersBase extends PulsarWebResource {
         // create non-partitioned topic manually and close the previous reader if present.
         try {
             pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
-                for (Subscription value : t.getSubscriptions().values()) {
+                t.getSubscriptions().forEach((__, value) -> {
                     try {
                         value.deleteForcefully();
                     } catch (Exception e) {
                         LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
                     }
-                }
+                });
             });
         } catch (Exception e) {
             LOG.warn("Failed to try to delete subscriptions for health check", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e338cd9..617c027 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1237,8 +1237,8 @@ public class NamespaceService implements AutoCloseable {
                         synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
                             if (pulsar.getBrokerService().getMultiLayerTopicMap()
                                     .containsKey(namespaceName.toString())) {
-                                pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
-                                        .forEach(bundle -> {
+                                pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())
+                                        .forEach((__, bundle) -> {
                                             bundle.forEach((topicName, topic) -> {
                                                 if (topic instanceof NonPersistentTopic
                                                         && ((NonPersistentTopic) topic).isActive()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 3be3a69..cb29461 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -31,7 +31,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -226,9 +225,10 @@ public class BacklogQuotaManager {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target);
             }
-            for (PersistentSubscription subscription : persistentTopic.getSubscriptions().values()) {
-                subscription.getExpiryMonitor().expireMessages(target);
-            }
+
+            persistentTopic.getSubscriptions().forEach((__, subscription) ->
+                    subscription.getExpiryMonitor().expireMessages(target)
+            );
         } else {
             // If disabled precise time based backlog quota check, will try to remove whole ledger from cursor's backlog
             Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1292ace..757f887 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -268,14 +268,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         cnxsPerThread.get().remove(this);
 
         // Connection is gone, close the producers immediately
-        producers.values().forEach((producerFuture) -> {
+        producers.forEach((__, producerFuture) -> {
             if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                 Producer producer = producerFuture.getNow(null);
                 producer.closeNow(true);
             }
         });
 
-        consumers.values().forEach((consumerFuture) -> {
+        consumers.forEach((__, consumerFuture) -> {
             Consumer consumer;
             if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
                 consumer = consumerFuture.getNow(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 90830f9..92986e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1328,8 +1328,8 @@ public class PersistentTopic extends AbstractTopic
             int messageTtlInSeconds = getMessageTTL();
 
             if (messageTtlInSeconds != 0) {
-                subscriptions.values().forEach((sub) -> sub.expireMessages(messageTtlInSeconds));
-                replicators.values().forEach((replicator)
+                subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
+                replicators.forEach((__, replicator)
                         -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
             }
         } catch (Exception e) {