You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/16 02:33:22 UTC

[pulsar] branch branch-2.9 updated (e434a531a11 -> 0d36b6bd50e)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from e434a531a11 [fix][broker]remove exception log when access status.html (#17025)
     new 1c6dc30625c Avoid contended synchronized block on topic load (#15883)
     new 0d36b6bd50e [improve][broker]Remove unnecessary lock on the stats thread  (#16983)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    | 55 ++++++++++++----------
 .../pulsar/broker/service/BrokerService.java       |  9 ++--
 .../broker/service/persistent/PersistentTopic.java |  7 +--
 .../stats/prometheus/NamespaceStatsAggregator.java |  8 +---
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  2 +-
 5 files changed, 37 insertions(+), 44 deletions(-)


[pulsar] 01/02: Avoid contended synchronized block on topic load (#15883)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1c6dc30625c80c546970d6394275d1d2d2a7631c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 2 08:34:55 2022 -0700

    Avoid contended synchronized block on topic load (#15883)
    
    (cherry picked from commit 7d2fdea7749d72b58def4045be3f295e0ee4f04d)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 42 ++++++++++++----------
 1 file changed, 23 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b60c8dc4845..b5efe4ed970 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1213,33 +1213,37 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         });
     }
 
-    public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
+    public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
             throws PulsarServerException {
         try {
             if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
-                        offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
-
-                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
-                        offloadPolicies.getManagedLedgerOffloadDriver());
-                try {
-                    return offloaderFactory.create(
-                        offloadPolicies,
-                        ImmutableMap.of(
-                            LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
-                            LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
-                            LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
-                        ),
-                        schemaStorage,
-                        getOffloaderScheduler(offloadPolicies));
-                } catch (IOException ioe) {
-                    throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+                synchronized (this) {
+                    Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+                            offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+                    LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
+                            offloadPolicies.getManagedLedgerOffloadDriver());
+                    try {
+                        return offloaderFactory.create(
+                                offloadPolicies,
+                                ImmutableMap.of(
+                                        LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
+                                        PulsarVersion.getVersion(),
+                                        LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
+                                        PulsarVersion.getGitSha(),
+                                        LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(),
+                                        config.getClusterName()
+                                ),
+                                schemaStorage, getOffloaderScheduler(offloadPolicies));
+                    } catch (IOException ioe) {
+                        throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+                    }
                 }
             } else {
-                LOG.info("No ledger offloader configured, using NULL instance");
+                LOG.debug("No ledger offloader configured, using NULL instance");
                 return NullLedgerOffloader.INSTANCE;
             }
         } catch (Throwable t) {


[pulsar] 02/02: [improve][broker]Remove unnecessary lock on the stats thread (#16983)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0d36b6bd50e34d034e198351650a172001dc7116
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800

    [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
    
    ---
    
    *Motivation*
    
    We found there has a block between the pulsar-ordered executor and
    the pulsar-stats-updater executor.
    
    The pulsar-ordered executor is trying to createManagedLedgerOffloader,
    and the pulsar-stats-updater is getting the compactor. Both them want
    to get the lock.
    
    We have an improvement about the `createManagedLedgerOffloader` before.
    https://github.com/apache/pulsar/pull/15883
    
    We are using `getCompactor(false)` for the stats related operations.
    The `getCompactor` is guarded by `synchronized`. Actually, the stats
    just want to get the current compactor without initializing it. We
    don't need to use `synchronized` to guard this operation.
    
    *Modification*
    
    Remove unnecessary `synchronized` on the `getCompactor` method.
    
    (cherry picked from commit 4d5ecba9394515e7dbf19fd01739c1e1dc90e5ec)
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   | 13 ++++++++-----
 .../org/apache/pulsar/broker/service/BrokerService.java     |  9 +++------
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  7 +------
 .../broker/stats/prometheus/NamespaceStatsAggregator.java   |  8 +-------
 .../apache/pulsar/broker/stats/PrometheusMetricsTest.java   |  2 +-
 5 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b5efe4ed970..9211a4efa45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1312,16 +1312,19 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        return getCompactor(true);
-    }
-
-    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
-        if (this.compactor == null && shouldInitialize) {
+        if (this.compactor == null) {
             this.compactor = newCompactor();
         }
         return this.compactor;
     }
 
+    // This method is used for metrics, which is allowed to as null
+    // Because it's no operation on the compactor, so let's remove the  synchronized on this method
+    // to avoid unnecessary lock competition.
+    public Compactor getNullableCompactor() {
+        return this.compactor;
+    }
+
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bc6dbb73750..1fbc19657f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1881,12 +1881,9 @@ public class BrokerService implements Closeable {
         }
         topics.remove(topic);
 
-        try {
-            Compactor compactor = pulsar.getCompactor(false);
-            if (compactor != null) {
-                compactor.getStats().removeTopic(topic);
-            }
-        } catch (PulsarServerException ignore) {
+        Compactor compactor = pulsar.getNullableCompactor();
+        if (compactor != null) {
+            compactor.getStats().removeTopic(topic);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5f2fd5ea1d9..99f12d1b1a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1986,12 +1986,7 @@ public class PersistentTopic extends AbstractTopic
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {
-        Compactor compactor = null;
-        try {
-            compactor = brokerService.pulsar().getCompactor(false);
-        } catch (PulsarServerException ex) {
-            log.warn("get compactor error", ex);
-        }
+        Compactor compactor = brokerService.pulsar().getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index e70f092af80..16e438e2a2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -100,12 +99,7 @@ public class NamespaceStatsAggregator {
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
-        Compactor compactor = null;
-        try {
-            compactor = pulsar.getCompactor(false);
-        } catch (PulsarServerException e) {
-            log.error("get compactor error", e);
-        }
+        Compactor compactor = pulsar.getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index eae58b67c6a..f28412ea751 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1321,7 +1321,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
         ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
-        Compactor compactor = pulsar.getCompactor(true);
+        Compactor compactor = pulsar.getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);