You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/11 02:50:28 UTC

[pulsar] branch branch-2.8 updated (723149ec440 -> c1e176e7db6)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 723149ec440 Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl (#16985)
     new b047e8f9779 Avoid contended synchronized block on topic load (#15883)
     new c1e176e7db6 [improve][broker]Remove unnecessary lock on the stats thread  (#16983)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    | 54 +++++++++++++---------
 .../pulsar/broker/service/BrokerService.java       |  9 ++--
 .../broker/service/persistent/PersistentTopic.java |  7 +--
 .../stats/prometheus/NamespaceStatsAggregator.java |  8 +---
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  2 +-
 5 files changed, 37 insertions(+), 43 deletions(-)


[pulsar] 01/02: Avoid contended synchronized block on topic load (#15883)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b047e8f97797f18c9f18ab271e16551fab1b0ab4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 2 08:34:55 2022 -0700

    Avoid contended synchronized block on topic load (#15883)
    
    (cherry picked from commit 7d2fdea7749d72b58def4045be3f295e0ee4f04d)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 41 ++++++++++++----------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 49440be5452..760cc83b5d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1222,33 +1222,38 @@ public class PulsarService implements AutoCloseable {
         });
     }
 
-    public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
+    public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
             throws PulsarServerException {
         try {
             if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
-                        offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
 
-                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
-                        offloadPolicies.getManagedLedgerOffloadDriver());
-                try {
-                    return offloaderFactory.create(
-                        offloadPolicies,
-                        ImmutableMap.of(
-                            LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
-                            LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
-                            LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
-                        ),
-                        schemaStorage,
-                        getOffloaderScheduler(offloadPolicies));
-                } catch (IOException ioe) {
-                    throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+                synchronized (this) {
+                    Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+                            offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+                    LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
+                            offloadPolicies.getManagedLedgerOffloadDriver());
+                    try {
+                        return offloaderFactory.create(
+                                offloadPolicies,
+                                ImmutableMap.of(
+                                        LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
+                                        PulsarVersion.getVersion(),
+                                        LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
+                                        PulsarVersion.getGitSha(),
+                                        LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(),
+                                        config.getClusterName()
+                                ),
+                                schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats);
+                    } catch (IOException ioe) {
+                        throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+                    }
                 }
             } else {
-                LOG.info("No ledger offloader configured, using NULL instance");
+                LOG.debug("No ledger offloader configured, using NULL instance");
                 return NullLedgerOffloader.INSTANCE;
             }
         } catch (Throwable t) {


[pulsar] 02/02: [improve][broker]Remove unnecessary lock on the stats thread (#16983)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c1e176e7db6fbda83970b3cde3b104b37c8f14c3
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800

    [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
    
    ---
    
    *Motivation*
    
    We found there has a block between the pulsar-ordered executor and
    the pulsar-stats-updater executor.
    
    The pulsar-ordered executor is trying to createManagedLedgerOffloader,
    and the pulsar-stats-updater is getting the compactor. Both them want
    to get the lock.
    
    We have an improvement about the `createManagedLedgerOffloader` before.
    https://github.com/apache/pulsar/pull/15883
    
    We are using `getCompactor(false)` for the stats related operations.
    The `getCompactor` is guarded by `synchronized`. Actually, the stats
    just want to get the current compactor without initializing it. We
    don't need to use `synchronized` to guard this operation.
    
    *Modification*
    
    Remove unnecessary `synchronized` on the `getCompactor` method.
    
    (cherry picked from commit 4d5ecba9394515e7dbf19fd01739c1e1dc90e5ec)
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   | 13 ++++++++-----
 .../org/apache/pulsar/broker/service/BrokerService.java     |  9 +++------
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  7 +------
 .../broker/stats/prometheus/NamespaceStatsAggregator.java   |  8 +-------
 .../apache/pulsar/broker/stats/PrometheusMetricsTest.java   |  2 +-
 5 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 760cc83b5d6..459b1d9dc67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1329,16 +1329,19 @@ public class PulsarService implements AutoCloseable {
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        return getCompactor(true);
-    }
-
-    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
-        if (this.compactor == null && shouldInitialize) {
+        if (this.compactor == null) {
             this.compactor = newCompactor();
         }
         return this.compactor;
     }
 
+    // This method is used for metrics, which is allowed to as null
+    // Because it's no operation on the compactor, so let's remove the  synchronized on this method
+    // to avoid unnecessary lock competition.
+    public Compactor getNullableCompactor() {
+        return this.compactor;
+    }
+
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e91e462f7df..34b126d6104 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1877,12 +1877,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
         topics.remove(topic);
 
-        try {
-            Compactor compactor = pulsar.getCompactor(false);
-            if (compactor != null) {
-                compactor.getStats().removeTopic(topic);
-            }
-        } catch (PulsarServerException ignore) {
+        Compactor compactor = pulsar.getNullableCompactor();
+        if (compactor != null) {
+            compactor.getStats().removeTopic(topic);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 19706585531..f8ea56e2c2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1967,12 +1967,7 @@ public class PersistentTopic extends AbstractTopic
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {
-        Compactor compactor = null;
-        try {
-            compactor = brokerService.pulsar().getCompactor(false);
-        } catch (PulsarServerException ex) {
-            log.warn("get compactor error", ex);
-        }
+        Compactor compactor = brokerService.pulsar().getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 2a21a0b402a..945725f4b77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -100,12 +99,7 @@ public class NamespaceStatsAggregator {
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
-        Compactor compactor = null;
-        try {
-            compactor = pulsar.getCompactor(false);
-        } catch (PulsarServerException e) {
-            log.error("get compactor error", e);
-        }
+        Compactor compactor = pulsar.getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c0383cd6e9f..df765d65295 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1220,7 +1220,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
         ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
-        Compactor compactor = pulsar.getCompactor(true);
+        Compactor compactor = pulsar.getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);