You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 11:36:29 UTC
[pulsar] 03/05: [improve][broker]Remove unnecessary lock on the stats thread (#16983)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 643963a60fa1019180bf16d74e9ee7e0f4ef4939
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800
[improve][broker]Remove unnecessary lock on the stats thread (#16983)
---
*Motivation*
We found there has a block between the pulsar-ordered executor and
the pulsar-stats-updater executor.
The pulsar-ordered executor is trying to createManagedLedgerOffloader,
and the pulsar-stats-updater is getting the compactor. Both them want
to get the lock.
We have an improvement about the `createManagedLedgerOffloader` before.
https://github.com/apache/pulsar/pull/15883
We are using `getCompactor(false)` for the stats related operations.
The `getCompactor` is guarded by `synchronized`. Actually, the stats
just want to get the current compactor without initializing it. We
don't need to use `synchronized` to guard this operation.
*Modification*
Remove unnecessary `synchronized` on the `getCompactor` method.
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 13 ++++++++-----
.../org/apache/pulsar/broker/service/BrokerService.java | 9 +++------
.../pulsar/broker/service/persistent/PersistentTopic.java | 7 +------
.../broker/stats/prometheus/NamespaceStatsAggregator.java | 8 +-------
.../apache/pulsar/broker/stats/PrometheusMetricsTest.java | 2 +-
5 files changed, 14 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b5baa717093..6e9a9442a6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1419,16 +1419,19 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
public synchronized Compactor getCompactor() throws PulsarServerException {
- return getCompactor(true);
- }
-
- public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
- if (this.compactor == null && shouldInitialize) {
+ if (this.compactor == null) {
this.compactor = newCompactor();
}
return this.compactor;
}
+ // This method is used for metrics, which is allowed to as null
+ // Because it's no operation on the compactor, so let's remove the synchronized on this method
+ // to avoid unnecessary lock competition.
+ public Compactor getNullableCompactor() {
+ return this.compactor;
+ }
+
protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0a2e99004a8..e99fce6f68a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1990,12 +1990,9 @@ public class BrokerService implements Closeable {
}
topics.remove(topic);
- try {
- Compactor compactor = pulsar.getCompactor(false);
- if (compactor != null) {
- compactor.getStats().removeTopic(topic);
- }
- } catch (PulsarServerException ignore) {
+ Compactor compactor = pulsar.getNullableCompactor();
+ if (compactor != null) {
+ compactor.getStats().removeTopic(topic);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00e4bc01ec4..6673bedce23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1945,12 +1945,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
private Optional<CompactorMXBean> getCompactorMXBean() {
- Compactor compactor = null;
- try {
- compactor = brokerService.pulsar().getCompactor(false);
- } catch (PulsarServerException ex) {
- log.warn("get compactor error", ex);
- }
+ Compactor compactor = brokerService.pulsar().getNullableCompactor();
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index c3e8567de54..f444ad0542e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -103,12 +102,7 @@ public class NamespaceStatsAggregator {
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
- Compactor compactor = null;
- try {
- compactor = pulsar.getCompactor(false);
- } catch (PulsarServerException e) {
- log.error("get compactor error", e);
- }
+ Compactor compactor = pulsar.getNullableCompactor();
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index d9d1b25665c..c5ecb8d5bf6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1417,7 +1417,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
.value(data)
.send();
}
- Compactor compactor = pulsar.getCompactor(true);
+ Compactor compactor = pulsar.getCompactor();
compactor.compact(topicName).get();
statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);