You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/25 05:23:43 UTC
[pulsar] branch master updated: Avoid making copies of internal
maps when iterating (#10691)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed2dfc9 Avoid making copies of internal maps when iterating (#10691)
ed2dfc9 is described below
commit ed2dfc91226cf0d663cbeea052e923a125812fca
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 24 22:23:04 2021 -0700
Avoid making copies of internal maps when iterating (#10691)
### Motivation
In several places in the code when iterating over the custom hashmaps, we are taking over a copy of the map. This was done every time the iteration could end up modifying the map, since there was a non-reentrant mutex taken during the iteration. Any modification would lead to a deadlock.
Since the behavior was changed in #9787 to not hold the section mutex during the iteration, there's no more need to make a copy of the maps.
---
.../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 5 ++---
.../java/org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++--
.../org/apache/pulsar/broker/service/BacklogQuotaManager.java | 8 ++++----
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++--
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 4 ++--
5 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index abac85b..8c80e38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -46,7 +46,6 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
@@ -313,13 +312,13 @@ public class BrokersBase extends PulsarWebResource {
// create non-partitioned topic manually and close the previous reader if present.
try {
pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
- for (Subscription value : t.getSubscriptions().values()) {
+ t.getSubscriptions().forEach((__, value) -> {
try {
value.deleteForcefully();
} catch (Exception e) {
LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
}
- }
+ });
});
} catch (Exception e) {
LOG.warn("Failed to try to delete subscriptions for health check", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e338cd9..617c027 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1237,8 +1237,8 @@ public class NamespaceService implements AutoCloseable {
synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
if (pulsar.getBrokerService().getMultiLayerTopicMap()
.containsKey(namespaceName.toString())) {
- pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
- .forEach(bundle -> {
+ pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())
+ .forEach((__, bundle) -> {
bundle.forEach((topicName, topic) -> {
if (topic instanceof NonPersistentTopic
&& ((NonPersistentTopic) topic).isActive()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 3be3a69..cb29461 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -31,7 +31,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -226,9 +225,10 @@ public class BacklogQuotaManager {
if (log.isDebugEnabled()) {
log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target);
}
- for (PersistentSubscription subscription : persistentTopic.getSubscriptions().values()) {
- subscription.getExpiryMonitor().expireMessages(target);
- }
+
+ persistentTopic.getSubscriptions().forEach((__, subscription) ->
+ subscription.getExpiryMonitor().expireMessages(target)
+ );
} else {
// If disabled precise time based backlog quota check, will try to remove whole ledger from cursor's backlog
Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1292ace..757f887 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -268,14 +268,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
cnxsPerThread.get().remove(this);
// Connection is gone, close the producers immediately
- producers.values().forEach((producerFuture) -> {
+ producers.forEach((__, producerFuture) -> {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
}
});
- consumers.values().forEach((consumerFuture) -> {
+ consumers.forEach((__, consumerFuture) -> {
Consumer consumer;
if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumer = consumerFuture.getNow(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 90830f9..92986e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1328,8 +1328,8 @@ public class PersistentTopic extends AbstractTopic
int messageTtlInSeconds = getMessageTTL();
if (messageTtlInSeconds != 0) {
- subscriptions.values().forEach((sub) -> sub.expireMessages(messageTtlInSeconds));
- replicators.values().forEach((replicator)
+ subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
+ replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
} catch (Exception e) {