You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/11 02:50:30 UTC

[pulsar] 02/02: [improve][broker]Remove unnecessary lock on the stats thread (#16983)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c1e176e7db6fbda83970b3cde3b104b37c8f14c3
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800

    [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
    
    ---
    
    *Motivation*
    
    We found there has a block between the pulsar-ordered executor and
    the pulsar-stats-updater executor.
    
    The pulsar-ordered executor is trying to createManagedLedgerOffloader,
    and the pulsar-stats-updater is getting the compactor. Both them want
    to get the lock.
    
    We have an improvement about the `createManagedLedgerOffloader` before.
    https://github.com/apache/pulsar/pull/15883
    
    We are using `getCompactor(false)` for the stats related operations.
    The `getCompactor` is guarded by `synchronized`. Actually, the stats
    just want to get the current compactor without initializing it. We
    don't need to use `synchronized` to guard this operation.
    
    *Modification*
    
    Remove unnecessary `synchronized` on the `getCompactor` method.
    
    (cherry picked from commit 4d5ecba9394515e7dbf19fd01739c1e1dc90e5ec)
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   | 13 ++++++++-----
 .../org/apache/pulsar/broker/service/BrokerService.java     |  9 +++------
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  7 +------
 .../broker/stats/prometheus/NamespaceStatsAggregator.java   |  8 +-------
 .../apache/pulsar/broker/stats/PrometheusMetricsTest.java   |  2 +-
 5 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 760cc83b5d6..459b1d9dc67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1329,16 +1329,19 @@ public class PulsarService implements AutoCloseable {
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        return getCompactor(true);
-    }
-
-    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
-        if (this.compactor == null && shouldInitialize) {
+        if (this.compactor == null) {
             this.compactor = newCompactor();
         }
         return this.compactor;
     }
 
+    // This method is used for metrics, which is allowed to as null
+    // Because it's no operation on the compactor, so let's remove the  synchronized on this method
+    // to avoid unnecessary lock competition.
+    public Compactor getNullableCompactor() {
+        return this.compactor;
+    }
+
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e91e462f7df..34b126d6104 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1877,12 +1877,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
         topics.remove(topic);
 
-        try {
-            Compactor compactor = pulsar.getCompactor(false);
-            if (compactor != null) {
-                compactor.getStats().removeTopic(topic);
-            }
-        } catch (PulsarServerException ignore) {
+        Compactor compactor = pulsar.getNullableCompactor();
+        if (compactor != null) {
+            compactor.getStats().removeTopic(topic);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 19706585531..f8ea56e2c2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1967,12 +1967,7 @@ public class PersistentTopic extends AbstractTopic
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {
-        Compactor compactor = null;
-        try {
-            compactor = brokerService.pulsar().getCompactor(false);
-        } catch (PulsarServerException ex) {
-            log.warn("get compactor error", ex);
-        }
+        Compactor compactor = brokerService.pulsar().getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 2a21a0b402a..945725f4b77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -100,12 +99,7 @@ public class NamespaceStatsAggregator {
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
-        Compactor compactor = null;
-        try {
-            compactor = pulsar.getCompactor(false);
-        } catch (PulsarServerException e) {
-            log.error("get compactor error", e);
-        }
+        Compactor compactor = pulsar.getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c0383cd6e9f..df765d65295 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1220,7 +1220,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
         ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
-        Compactor compactor = pulsar.getCompactor(true);
+        Compactor compactor = pulsar.getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);