You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/11 02:50:30 UTC
[pulsar] 02/02: [improve][broker]Remove unnecessary lock on the stats thread (#16983)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c1e176e7db6fbda83970b3cde3b104b37c8f14c3
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800
[improve][broker]Remove unnecessary lock on the stats thread (#16983)
---
*Motivation*
We found there has a block between the pulsar-ordered executor and
the pulsar-stats-updater executor.
The pulsar-ordered executor is trying to createManagedLedgerOffloader,
and the pulsar-stats-updater is getting the compactor. Both them want
to get the lock.
We have an improvement about the `createManagedLedgerOffloader` before.
https://github.com/apache/pulsar/pull/15883
We are using `getCompactor(false)` for the stats related operations.
The `getCompactor` is guarded by `synchronized`. Actually, the stats
just want to get the current compactor without initializing it. We
don't need to use `synchronized` to guard this operation.
*Modification*
Remove unnecessary `synchronized` on the `getCompactor` method.
(cherry picked from commit 4d5ecba9394515e7dbf19fd01739c1e1dc90e5ec)
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 13 ++++++++-----
.../org/apache/pulsar/broker/service/BrokerService.java | 9 +++------
.../pulsar/broker/service/persistent/PersistentTopic.java | 7 +------
.../broker/stats/prometheus/NamespaceStatsAggregator.java | 8 +-------
.../apache/pulsar/broker/stats/PrometheusMetricsTest.java | 2 +-
5 files changed, 14 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 760cc83b5d6..459b1d9dc67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1329,16 +1329,19 @@ public class PulsarService implements AutoCloseable {
}
public synchronized Compactor getCompactor() throws PulsarServerException {
- return getCompactor(true);
- }
-
- public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
- if (this.compactor == null && shouldInitialize) {
+ if (this.compactor == null) {
this.compactor = newCompactor();
}
return this.compactor;
}
+ // This method is used for metrics, which is allowed to as null
+ // Because it's no operation on the compactor, so let's remove the synchronized on this method
+ // to avoid unnecessary lock competition.
+ public Compactor getNullableCompactor() {
+ return this.compactor;
+ }
+
protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e91e462f7df..34b126d6104 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1877,12 +1877,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
topics.remove(topic);
- try {
- Compactor compactor = pulsar.getCompactor(false);
- if (compactor != null) {
- compactor.getStats().removeTopic(topic);
- }
- } catch (PulsarServerException ignore) {
+ Compactor compactor = pulsar.getNullableCompactor();
+ if (compactor != null) {
+ compactor.getStats().removeTopic(topic);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 19706585531..f8ea56e2c2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1967,12 +1967,7 @@ public class PersistentTopic extends AbstractTopic
}
private Optional<CompactorMXBean> getCompactorMXBean() {
- Compactor compactor = null;
- try {
- compactor = brokerService.pulsar().getCompactor(false);
- } catch (PulsarServerException ex) {
- log.warn("get compactor error", ex);
- }
+ Compactor compactor = brokerService.pulsar().getNullableCompactor();
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 2a21a0b402a..945725f4b77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -100,12 +99,7 @@ public class NamespaceStatsAggregator {
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
- Compactor compactor = null;
- try {
- compactor = pulsar.getCompactor(false);
- } catch (PulsarServerException e) {
- log.error("get compactor error", e);
- }
+ Compactor compactor = pulsar.getNullableCompactor();
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c0383cd6e9f..df765d65295 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1220,7 +1220,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
}
ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
- Compactor compactor = pulsar.getCompactor(true);
+ Compactor compactor = pulsar.getCompactor();
compactor.compact(topicName).get();
statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);