You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:07 UTC

[pulsar] 25/31: [fix][broker] Fix to avoid TopicStatsImpl NPE even if producerName is null (#15502)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d23e251a9b3070d2c7716afb01170667957588ba
Author: Yuri Mizushima <yu...@yahoo-corp.jp>
AuthorDate: Sat May 14 14:40:46 2022 +0900

    [fix][broker] Fix to avoid TopicStatsImpl NPE even if producerName is null (#15502)
    
    (cherry picked from commit 7bdfa3a3e5e71c44a174b28d0a843fb6730865fa)
---
 .../org/apache/pulsar/broker/service/Producer.java |  2 +-
 .../data/stats/NonPersistentTopicStatsImpl.java    | 23 +++++++------
 .../common/policies/data/stats/TopicStatsImpl.java | 13 ++++---
 .../NonPersistentPartitionedTopicStatsTest.java    | 40 ++++++++++++++++++++++
 .../policies/data/PersistentTopicStatsTest.java    | 38 ++++++++++++++++++++
 5 files changed, 101 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 99d21db569b..f8591a8447a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -128,7 +128,7 @@ public class Producer {
         stats.setClientVersion(cnx.getClientVersion());
         stats.setProducerName(producerName);
         stats.producerId = producerId;
-        if (serviceConf.isAggregatePublisherStatsByProducerName()) {
+        if (serviceConf.isAggregatePublisherStatsByProducerName() && stats.getProducerName() != null) {
             // If true and the client supports partial producer,
             // aggregate publisher stats of PartitionedTopicStats by producerName.
             // Otherwise, aggregate it by list index.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 35c665f881d..23e603ea028 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.policies.data.stats;
 
+import static java.util.Comparator.naturalOrder;
+import static java.util.Comparator.nullsLast;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -62,10 +64,10 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
 
     @JsonProperty("publishers")
     public List<NonPersistentPublisherStats> getNonPersistentPublishers() {
-        return Stream.concat(nonPersistentPublishers.stream()
-                                .sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)),
-                nonPersistentPublishersMap.values().stream()
-                        .sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)))
+        return Stream.concat(nonPersistentPublishers.stream().sorted(
+                        Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))),
+                nonPersistentPublishersMap.values().stream().sorted(
+                        Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))))
                 .collect(Collectors.toList());
     }
 
@@ -92,10 +94,10 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = "expected to override")
     public List<NonPersistentPublisherStats> getPublishers() {
-        return Stream.concat(nonPersistentPublishers.stream()
-                                .sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)),
-                nonPersistentPublishersMap.values()
-                        .stream().sorted(Comparator.comparing(NonPersistentPublisherStats::getProducerName)))
+        return Stream.concat(nonPersistentPublishers.stream().sorted(
+                        Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))),
+                nonPersistentPublishersMap.values().stream().sorted(
+                        Comparator.comparing(NonPersistentPublisherStats::getProducerName, nullsLast(naturalOrder()))))
                 .collect(Collectors.toList());
     }
 
@@ -106,9 +108,10 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
     }
 
     public void addPublisher(NonPersistentPublisherStatsImpl stats) {
-        if (stats.isSupportsPartialProducer()) {
+        if (stats.isSupportsPartialProducer() && stats.getProducerName() != null) {
             nonPersistentPublishersMap.put(stats.getProducerName(), stats);
         } else {
+            stats.setSupportsPartialProducer(false); // setter method with side effect
             nonPersistentPublishers.add(stats);
         }
     }
@@ -153,7 +156,7 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
         this.msgDropRate += stats.msgDropRate;
 
         stats.getNonPersistentPublishers().forEach(s -> {
-            if (s.isSupportsPartialProducer()) {
+            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                 ((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap
                         .computeIfAbsent(s.getProducerName(), key -> {
                             final NonPersistentPublisherStatsImpl newStats = new NonPersistentPublisherStatsImpl();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 18e7d60054f..3e90ca9be94 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.policies.data.stats;
 
+import static java.util.Comparator.naturalOrder;
+import static java.util.Comparator.nullsLast;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -133,8 +135,10 @@ public class TopicStatsImpl implements TopicStats {
     public CompactionStatsImpl compaction;
 
     public List<? extends PublisherStats> getPublishers() {
-        return Stream.concat(publishers.stream().sorted(Comparator.comparing(PublisherStatsImpl::getProducerName)),
-                publishersMap.values().stream().sorted(Comparator.comparing(PublisherStatsImpl::getProducerName)))
+        return Stream.concat(publishers.stream().sorted(
+                                Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))),
+                        publishersMap.values().stream().sorted(
+                                Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))))
                 .collect(Collectors.toList());
     }
 
@@ -145,9 +149,10 @@ public class TopicStatsImpl implements TopicStats {
     }
 
     public void addPublisher(PublisherStatsImpl stats) {
-        if (stats.isSupportsPartialProducer()) {
+        if (stats.isSupportsPartialProducer() && stats.getProducerName() != null) {
             publishersMap.put(stats.getProducerName(), stats);
         } else {
+            stats.setSupportsPartialProducer(false); // setter method with side effect
             publishers.add(stats);
         }
     }
@@ -223,7 +228,7 @@ public class TopicStatsImpl implements TopicStats {
         this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
 
         stats.getPublishers().forEach(s -> {
-           if (s.isSupportsPartialProducer()) {
+           if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
                    final PublisherStatsImpl newStats = new PublisherStatsImpl();
                    newStats.setSupportsPartialProducer(true);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
index 311ed432524..001eaec901f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
@@ -22,9 +22,11 @@ import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopi
 import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 
 public class NonPersistentPartitionedTopicStatsTest {
 
@@ -55,4 +57,42 @@ public class NonPersistentPartitionedTopicStatsTest {
         assertEquals(nonPersistentPartitionedTopicStats.metadata.partitions, 0);
         assertEquals(nonPersistentPartitionedTopicStats.partitions.size(), 0);
     }
+
+    @Test
+    public void testPartitionedTopicStatsByNullProducerName() {
+        final NonPersistentTopicStatsImpl topicStats1 = new NonPersistentTopicStatsImpl();
+        final NonPersistentPublisherStatsImpl publisherStats1 = new NonPersistentPublisherStatsImpl();
+        publisherStats1.setSupportsPartialProducer(false);
+        publisherStats1.setProducerName(null);
+        final NonPersistentPublisherStatsImpl publisherStats2 = new NonPersistentPublisherStatsImpl();
+        publisherStats2.setSupportsPartialProducer(false);
+        publisherStats2.setProducerName(null);
+        topicStats1.addPublisher(publisherStats1);
+        topicStats1.addPublisher(publisherStats2);
+
+        assertEquals(topicStats1.getPublishers().size(), 2);
+        assertFalse(topicStats1.getPublishers().get(0).isSupportsPartialProducer());
+        assertFalse(topicStats1.getPublishers().get(1).isSupportsPartialProducer());
+
+        final NonPersistentTopicStatsImpl topicStats2 = new NonPersistentTopicStatsImpl();
+        final NonPersistentPublisherStatsImpl publisherStats3 = new NonPersistentPublisherStatsImpl();
+        publisherStats3.setSupportsPartialProducer(true);
+        publisherStats3.setProducerName(null);
+        final NonPersistentPublisherStatsImpl publisherStats4 = new NonPersistentPublisherStatsImpl();
+        publisherStats4.setSupportsPartialProducer(true);
+        publisherStats4.setProducerName(null);
+        topicStats2.addPublisher(publisherStats3);
+        topicStats2.addPublisher(publisherStats4);
+
+        assertEquals(topicStats2.getPublishers().size(), 2);
+        // when the producerName is null, fall back to false
+        assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
+        assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
+
+        final NonPersistentPartitionedTopicStatsImpl target = new NonPersistentPartitionedTopicStatsImpl();
+        target.add(topicStats1);
+        target.add(topicStats2);
+
+        assertEquals(target.getPublishers().size(), 2);
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index 0eb1997ffd2..0da2b9f8abb 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.policies.data;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 
 import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
@@ -235,4 +236,41 @@ public class PersistentTopicStatsTest {
         assertEquals(target.replication.size(), 1);
     }
 
+    @Test
+    public void testPersistentTopicStatsByNullProducerName() {
+        final TopicStatsImpl topicStats1 = new TopicStatsImpl();
+        final PublisherStatsImpl publisherStats1 = new PublisherStatsImpl();
+        publisherStats1.setSupportsPartialProducer(false);
+        publisherStats1.setProducerName(null);
+        final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
+        publisherStats2.setSupportsPartialProducer(false);
+        publisherStats2.setProducerName(null);
+        topicStats1.addPublisher(publisherStats1);
+        topicStats1.addPublisher(publisherStats2);
+
+        assertEquals(topicStats1.getPublishers().size(), 2);
+        assertFalse(topicStats1.getPublishers().get(0).isSupportsPartialProducer());
+        assertFalse(topicStats1.getPublishers().get(1).isSupportsPartialProducer());
+
+        final TopicStatsImpl topicStats2 = new TopicStatsImpl();
+        final PublisherStatsImpl publisherStats3 = new PublisherStatsImpl();
+        publisherStats3.setSupportsPartialProducer(true);
+        publisherStats3.setProducerName(null);
+        final PublisherStatsImpl publisherStats4 = new PublisherStatsImpl();
+        publisherStats4.setSupportsPartialProducer(true);
+        publisherStats4.setProducerName(null);
+        topicStats2.addPublisher(publisherStats3);
+        topicStats2.addPublisher(publisherStats4);
+
+        assertEquals(topicStats2.getPublishers().size(), 2);
+        // when the producerName is null, fall back to false
+        assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
+        assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
+
+        final TopicStatsImpl target = new TopicStatsImpl();
+        target.add(topicStats1);
+        target.add(topicStats2);
+
+        assertEquals(target.getPublishers().size(), 2);
+    }
 }