You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 11:36:29 UTC

[pulsar] 03/05: [improve][broker]Remove unnecessary lock on the stats thread (#16983)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 643963a60fa1019180bf16d74e9ee7e0f4ef4939
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800

    [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
    
    ---
    
    *Motivation*
    
    We found there has a block between the pulsar-ordered executor and
    the pulsar-stats-updater executor.
    
    The pulsar-ordered executor is trying to createManagedLedgerOffloader,
    and the pulsar-stats-updater is getting the compactor. Both them want
    to get the lock.
    
    We have an improvement about the `createManagedLedgerOffloader` before.
    https://github.com/apache/pulsar/pull/15883
    
    We are using `getCompactor(false)` for the stats related operations.
    The `getCompactor` is guarded by `synchronized`. Actually, the stats
    just want to get the current compactor without initializing it. We
    don't need to use `synchronized` to guard this operation.
    
    *Modification*
    
    Remove unnecessary `synchronized` on the `getCompactor` method.
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   | 13 ++++++++-----
 .../org/apache/pulsar/broker/service/BrokerService.java     |  9 +++------
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  7 +------
 .../broker/stats/prometheus/NamespaceStatsAggregator.java   |  8 +-------
 .../apache/pulsar/broker/stats/PrometheusMetricsTest.java   |  2 +-
 5 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b5baa717093..6e9a9442a6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1419,16 +1419,19 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        return getCompactor(true);
-    }
-
-    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
-        if (this.compactor == null && shouldInitialize) {
+        if (this.compactor == null) {
             this.compactor = newCompactor();
         }
         return this.compactor;
     }
 
+    // This method is used for metrics, which is allowed to as null
+    // Because it's no operation on the compactor, so let's remove the  synchronized on this method
+    // to avoid unnecessary lock competition.
+    public Compactor getNullableCompactor() {
+        return this.compactor;
+    }
+
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0a2e99004a8..e99fce6f68a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1990,12 +1990,9 @@ public class BrokerService implements Closeable {
         }
         topics.remove(topic);
 
-        try {
-            Compactor compactor = pulsar.getCompactor(false);
-            if (compactor != null) {
-                compactor.getStats().removeTopic(topic);
-            }
-        } catch (PulsarServerException ignore) {
+        Compactor compactor = pulsar.getNullableCompactor();
+        if (compactor != null) {
+            compactor.getStats().removeTopic(topic);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00e4bc01ec4..6673bedce23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1945,12 +1945,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {
-        Compactor compactor = null;
-        try {
-            compactor = brokerService.pulsar().getCompactor(false);
-        } catch (PulsarServerException ex) {
-            log.warn("get compactor error", ex);
-        }
+        Compactor compactor = brokerService.pulsar().getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index c3e8567de54..f444ad0542e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -103,12 +102,7 @@ public class NamespaceStatsAggregator {
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
-        Compactor compactor = null;
-        try {
-            compactor = pulsar.getCompactor(false);
-        } catch (PulsarServerException e) {
-            log.error("get compactor error", e);
-        }
+        Compactor compactor = pulsar.getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index d9d1b25665c..c5ecb8d5bf6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1417,7 +1417,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                     .value(data)
                     .send();
         }
-        Compactor compactor = pulsar.getCompactor(true);
+        Compactor compactor = pulsar.getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);