You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:07 UTC
[pulsar] 25/31: [fix][broker] Fix to avoid TopicStatsImpl NPE even if producerName is null (#15502)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d23e251a9b3070d2c7716afb01170667957588ba
Author: Yuri Mizushima <yu...@yahoo-corp.jp>
AuthorDate: Sat May 14 14:40:46 2022 +0900
[fix][broker] Fix to avoid TopicStatsImpl NPE even if producerName is null (#15502)
(cherry picked from commit 7bdfa3a3e5e71c44a174b28d0a843fb6730865fa)
---
.../org/apache/pulsar/broker/service/Producer.java | 2 +-
.../data/stats/NonPersistentTopicStatsImpl.java | 23 +++++++------
.../common/policies/data/stats/TopicStatsImpl.java | 13 ++++---
.../NonPersistentPartitionedTopicStatsTest.java | 40 ++++++++++++++++++++++
.../policies/data/PersistentTopicStatsTest.java | 38 ++++++++++++++++++++
5 files changed, 101 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 99d21db569b..f8591a8447a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -128,7 +128,7 @@ public class Producer {
stats.setClientVersion(cnx.getClientVersion());
stats.setProducerName(producerName);
stats.producerId = producerId;
- if (serviceConf.isAggregatePublisherStatsByProducerName()) {
+ if (serviceConf.isAggregatePublisherStatsByProducerName() && stats.getProducerName() != null) {
// If true and the client supports partial producer,
// aggregate publisher stats of PartitionedTopicStats by producerName.
// Otherwise, aggregate it by list index.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 35c665f881d..23e603ea028 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data.stats;
+import static java.util.Comparator.naturalOrder;
+import static java.util.Comparator.nullsLast;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -62,10 +64,10 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
@JsonProperty("publishers")
public List<NonPersistentPublisherStats> getNonPersistentPublishers() {
- return Stream.concat(nonPersistentPublishers.stream()
- .sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)),
- nonPersistentPublishersMap.values().stream()
- .sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)))
+ return Stream.concat(nonPersistentPublishers.stream().sorted(
+ Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))),
+ nonPersistentPublishersMap.values().stream().sorted(
+ Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))))
.collect(Collectors.toList());
}
@@ -92,10 +94,10 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
@SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = "expected to override")
public List<NonPersistentPublisherStats> getPublishers() {
- return Stream.concat(nonPersistentPublishers.stream()
- .sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)),
- nonPersistentPublishersMap.values()
- .stream().sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)))
+ return Stream.concat(nonPersistentPublishers.stream().sorted(
+ Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))),
+ nonPersistentPublishersMap.values().stream().sorted(
+ Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))))
.collect(Collectors.toList());
}
@@ -106,9 +108,10 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
}
public void addPublisher(NonPersistentPublisherStatsImpl stats) {
- if (stats.isSupportsPartialProducer()) {
+ if (stats.isSupportsPartialProducer() && stats.getProducerName() != null) {
nonPersistentPublishersMap.put(stats.getProducerName(), stats);
} else {
+ stats.setSupportsPartialProducer(false); // setter method with side effect
nonPersistentPublishers.add(stats);
}
}
@@ -153,7 +156,7 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
this.msgDropRate += stats.msgDropRate;
stats.getNonPersistentPublishers().forEach(s -> {
- if (s.isSupportsPartialProducer()) {
+ if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap
.computeIfAbsent(s.getProducerName(), key -> {
final NonPersistentPublisherStatsImpl newStats = new NonPersistentPublisherStatsImpl();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 18e7d60054f..3e90ca9be94 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data.stats;
+import static java.util.Comparator.naturalOrder;
+import static java.util.Comparator.nullsLast;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.ArrayList;
import java.util.Comparator;
@@ -133,8 +135,10 @@ public class TopicStatsImpl implements TopicStats {
public CompactionStatsImpl compaction;
public List<? extends PublisherStats> getPublishers() {
- return Stream.concat(publishers.stream().sorted(Comparator.comparing(PublisherStatsImpl::getProducerName)),
- publishersMap.values().stream().sorted(Comparator.comparing(PublisherStatsImpl::getProducerName)))
+ return Stream.concat(publishers.stream().sorted(
+ Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))),
+ publishersMap.values().stream().sorted(
+ Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))))
.collect(Collectors.toList());
}
@@ -145,9 +149,10 @@ public class TopicStatsImpl implements TopicStats {
}
public void addPublisher(PublisherStatsImpl stats) {
- if (stats.isSupportsPartialProducer()) {
+ if (stats.isSupportsPartialProducer() && stats.getProducerName() != null) {
publishersMap.put(stats.getProducerName(), stats);
} else {
+ stats.setSupportsPartialProducer(false); // setter method with side effect
publishers.add(stats);
}
}
@@ -223,7 +228,7 @@ public class TopicStatsImpl implements TopicStats {
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.getPublishers().forEach(s -> {
- if (s.isSupportsPartialProducer()) {
+ if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
final PublisherStatsImpl newStats = new PublisherStatsImpl();
newStats.setSupportsPartialProducer(true);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
index 311ed432524..001eaec901f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
@@ -22,9 +22,11 @@ import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopi
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
public class NonPersistentPartitionedTopicStatsTest {
@@ -55,4 +57,42 @@ public class NonPersistentPartitionedTopicStatsTest {
assertEquals(nonPersistentPartitionedTopicStats.metadata.partitions, 0);
assertEquals(nonPersistentPartitionedTopicStats.partitions.size(), 0);
}
+
+ @Test
+ public void testPartitionedTopicStatsByNullProducerName() {
+ final NonPersistentTopicStatsImpl topicStats1 = new NonPersistentTopicStatsImpl();
+ final NonPersistentPublisherStatsImpl publisherStats1 = new NonPersistentPublisherStatsImpl();
+ publisherStats1.setSupportsPartialProducer(false);
+ publisherStats1.setProducerName(null);
+ final NonPersistentPublisherStatsImpl publisherStats2 = new NonPersistentPublisherStatsImpl();
+ publisherStats2.setSupportsPartialProducer(false);
+ publisherStats2.setProducerName(null);
+ topicStats1.addPublisher(publisherStats1);
+ topicStats1.addPublisher(publisherStats2);
+
+ assertEquals(topicStats1.getPublishers().size(), 2);
+ assertFalse(topicStats1.getPublishers().get(0).isSupportsPartialProducer());
+ assertFalse(topicStats1.getPublishers().get(1).isSupportsPartialProducer());
+
+ final NonPersistentTopicStatsImpl topicStats2 = new NonPersistentTopicStatsImpl();
+ final NonPersistentPublisherStatsImpl publisherStats3 = new NonPersistentPublisherStatsImpl();
+ publisherStats3.setSupportsPartialProducer(true);
+ publisherStats3.setProducerName(null);
+ final NonPersistentPublisherStatsImpl publisherStats4 = new NonPersistentPublisherStatsImpl();
+ publisherStats4.setSupportsPartialProducer(true);
+ publisherStats4.setProducerName(null);
+ topicStats2.addPublisher(publisherStats3);
+ topicStats2.addPublisher(publisherStats4);
+
+ assertEquals(topicStats2.getPublishers().size(), 2);
+ // when the producerName is null, fall back to false
+ assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
+ assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
+
+ final NonPersistentPartitionedTopicStatsImpl target = new NonPersistentPartitionedTopicStatsImpl();
+ target.add(topicStats1);
+ target.add(topicStats2);
+
+ assertEquals(target.getPublishers().size(), 2);
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index 0eb1997ffd2..0da2b9f8abb 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.policies.data;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
@@ -235,4 +236,41 @@ public class PersistentTopicStatsTest {
assertEquals(target.replication.size(), 1);
}
+ @Test
+ public void testPersistentTopicStatsByNullProducerName() {
+ final TopicStatsImpl topicStats1 = new TopicStatsImpl();
+ final PublisherStatsImpl publisherStats1 = new PublisherStatsImpl();
+ publisherStats1.setSupportsPartialProducer(false);
+ publisherStats1.setProducerName(null);
+ final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
+ publisherStats2.setSupportsPartialProducer(false);
+ publisherStats2.setProducerName(null);
+ topicStats1.addPublisher(publisherStats1);
+ topicStats1.addPublisher(publisherStats2);
+
+ assertEquals(topicStats1.getPublishers().size(), 2);
+ assertFalse(topicStats1.getPublishers().get(0).isSupportsPartialProducer());
+ assertFalse(topicStats1.getPublishers().get(1).isSupportsPartialProducer());
+
+ final TopicStatsImpl topicStats2 = new TopicStatsImpl();
+ final PublisherStatsImpl publisherStats3 = new PublisherStatsImpl();
+ publisherStats3.setSupportsPartialProducer(true);
+ publisherStats3.setProducerName(null);
+ final PublisherStatsImpl publisherStats4 = new PublisherStatsImpl();
+ publisherStats4.setSupportsPartialProducer(true);
+ publisherStats4.setProducerName(null);
+ topicStats2.addPublisher(publisherStats3);
+ topicStats2.addPublisher(publisherStats4);
+
+ assertEquals(topicStats2.getPublishers().size(), 2);
+ // when the producerName is null, fall back to false
+ assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
+ assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
+
+ final TopicStatsImpl target = new TopicStatsImpl();
+ target.add(topicStats1);
+ target.add(topicStats2);
+
+ assertEquals(target.getPublishers().size(), 2);
+ }
}