You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/28 14:22:32 UTC

[pulsar] branch branch-2.11 updated (96be15c3bff -> 99501835783)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 96be15c3bff [improve][broker] fix broker irrational behavior when it is closing (#17085)
     new 39f14bd1b5b [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)
     new 99501835783 Fix NPE when ResourceGroupService execute scheduled task. (#17840)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    |   8 ++
 .../broker/resourcegroup/ResourceGroupService.java |  48 ++++++++--
 .../resourcegroup/ResourceGroupServiceTest.java    |   8 ++
 .../ResourceGroupUsageAggregationTest.java         | 105 ++++++++++++---------
 4 files changed, 117 insertions(+), 52 deletions(-)


[pulsar] 01/02: [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 39f14bd1b5bdf4c0c0e2916d07234f6c0f51ee36
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 16 15:51:40 2022 +0800

    [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617)
---
 .../broker/resourcegroup/ResourceGroupService.java |  11 +++
 .../ResourceGroupUsageAggregationTest.java         | 105 ++++++++++++---------
 2 files changed, 70 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 79cba28d374..4bb1bc8ab24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.resourcegroup;
 
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Summary;
 import java.util.Map;
@@ -829,4 +830,14 @@ public class ResourceGroupService {
             .name("pulsar_resource_group_calculate_quota_secs")
             .help("Time required to calculate quota of all resource groups, in seconds.")
             .register();
+
+    @VisibleForTesting
+    ConcurrentHashMap getTopicConsumeStats() {
+        return this.topicConsumeStats;
+    }
+
+    @VisibleForTesting
+    ConcurrentHashMap getTopicProduceStats() {
+        return this.topicProduceStats;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
index a86035b71e0..1fd9271d9bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
@@ -24,6 +24,8 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCoun
 import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -37,16 +39,16 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
-import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
-@Test(groups = "flaky")
 public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
@@ -120,10 +122,11 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
                 .create();
 
         Consumer<byte[]> consumer = null;
+        String subscriptionName = "my-subscription";
         try {
             consumer = pulsarClient.newConsumer()
                     .topic(topicString)
-                    .subscriptionName("my-subscription")
+                    .subscriptionName(subscriptionName)
                     .subscriptionType(SubscriptionType.Shared)
                     .subscribe();
         } catch (PulsarClientException p) {
@@ -176,6 +179,20 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
                 true, true);
 
         consumer.close();
+        // cleanup the topic data.
+        CompletableFuture<Optional<Topic>> topicFuture = pulsar.getBrokerService().getTopics().remove(topicString);
+        if (topicFuture != null) {
+            Optional<Topic> optTopic = topicFuture.join();
+            if (optTopic.isPresent()) {
+                Topic topic = optTopic.get();
+                if (topic instanceof PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
+                    persistentTopic.getSubscription(subscriptionName).deleteForcefully();
+                }
+            }
+        }
+        rgs.getTopicConsumeStats().clear();
+        rgs.getTopicProduceStats().clear();
 
         rgs.unRegisterTenant(activeRgName, tenantString);
         rgs.unRegisterNameSpace(activeRgName, NamespaceName.get(nsString));
@@ -192,48 +209,44 @@ public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
                              boolean checkProduce, boolean checkConsume)
                                                                 throws InterruptedException, PulsarAdminException {
         BrokerService bs = pulsar.getBrokerService();
-        Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
-        for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
-            String mapTopicName = entry.getKey();
-            if (mapTopicName.equals(topicString)) {
-                TopicStatsImpl stats = entry.getValue();
-                if (checkProduce) {
-                    Assert.assertTrue(stats.bytesInCounter >= sentNumBytes);
-                    Assert.assertEquals(sentNumMsgs, stats.msgInCounter);
-                }
-                if (checkConsume) {
-                    Assert.assertTrue(stats.bytesOutCounter >= recvdNumBytes);
-                    Assert.assertEquals(recvdNumMsgs, stats.msgOutCounter);
-                }
-
-                if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
-                    rgs.aggregateResourceGroupLocalUsages();  // hack to ensure aggregator calculation without waiting
-                    BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
-                            ResourceGroupUsageStatsType.Cumulative);
-                    BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
-                            ResourceGroupUsageStatsType.Cumulative);
-
-                    // Re-do the getRGUsage.
-                    // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
-                    BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
-                            ResourceGroupUsageStatsType.Cumulative);
-                    BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
-                            ResourceGroupUsageStatsType.Cumulative);
-
-                    Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
-                    Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
-                    Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
-                    Assert.assertEquals(consCounts1.messages, consCounts.messages);
-
-                    if (checkProduce) {
-                        Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
-                        Assert.assertEquals(sentNumMsgs, prodCounts.messages);
-                    }
-                    if (checkConsume) {
-                        Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
-                        Assert.assertEquals(recvdNumMsgs, consCounts.messages);
-                    }
-                }
+        Awaitility.await().untilAsserted(() -> {
+            TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
+            Assert.assertNotNull(topicStats);
+            if (checkProduce) {
+                Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes);
+                Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter);
+            }
+            if (checkConsume) {
+                Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes);
+                Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter);
+            }
+        });
+        if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
+            rgs.aggregateResourceGroupLocalUsages();  // hack to ensure aggregator calculation without waiting
+            BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
+                    ResourceGroupUsageStatsType.Cumulative);
+            BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
+                    ResourceGroupUsageStatsType.Cumulative);
+
+            // Re-do the getRGUsage.
+            // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
+            BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
+                    ResourceGroupUsageStatsType.Cumulative);
+            BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
+                    ResourceGroupUsageStatsType.Cumulative);
+
+            Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
+            Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
+            Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
+            Assert.assertEquals(consCounts1.messages, consCounts.messages);
+
+            if (checkProduce) {
+                Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
+                Assert.assertEquals(sentNumMsgs, prodCounts.messages);
+            }
+            if (checkConsume) {
+                Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
+                Assert.assertEquals(recvdNumMsgs, consCounts.messages);
             }
         }
     }


[pulsar] 02/02: Fix NPE when ResourceGroupService execute scheduled task. (#17840)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 99501835783540daabe31b4600d801fbf5067fa4
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Sep 28 21:55:26 2022 +0800

    Fix NPE when ResourceGroupService execute scheduled task. (#17840)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 +++++
 .../broker/resourcegroup/ResourceGroupService.java | 37 ++++++++++++++++++----
 .../resourcegroup/ResourceGroupServiceTest.java    |  8 +++++
 3 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b28318de89c..abc70480a8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -419,6 +419,14 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 }
                 this.resourceUsageTransportManager = null;
             }
+            if (this.resourceGroupServiceManager != null) {
+                try {
+                    this.resourceGroupServiceManager.close();
+                } catch (Exception e) {
+                    LOG.warn("ResourceGroupServiceManager closing failed {}", e.getMessage());
+                }
+                this.resourceGroupServiceManager = null;
+            }
 
             if (this.webService != null) {
                 try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 4bb1bc8ab24..c74681fdb73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
  *
  * @see PulsarService
  */
-public class ResourceGroupService {
+public class ResourceGroupService implements AutoCloseable{
     /**
      * Default constructor.
      */
@@ -303,6 +303,21 @@ public class ResourceGroupService {
         return this.namespaceToRGsMap.get(namespaceName);
     }
 
+    @Override
+    public void close() throws Exception {
+        if (aggregateLocalUsagePeriodicTask != null) {
+            aggregateLocalUsagePeriodicTask.cancel(true);
+        }
+        if (calculateQuotaPeriodicTask != null) {
+            calculateQuotaPeriodicTask.cancel(true);
+        }
+        resourceGroupsMap.clear();
+        tenantToRGsMap.clear();
+        namespaceToRGsMap.clear();
+        topicProduceStats.clear();
+        topicConsumeStats.clear();
+    }
+
     /**
      * Increments usage stats for the resource groups associated with the given namespace and tenant.
      * Expected to be called when a message is produced or consumed on a topic, or when we calculate
@@ -565,17 +580,17 @@ public class ResourceGroupService {
         ServiceConfiguration config = pulsar.getConfiguration();
         long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs();
         if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
-            if (this.aggreagteLocalUsagePeriodicTask == null) {
+            if (this.aggregateLocalUsagePeriodicTask == null) {
                 log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when "
                                 + "publish period changed from {} to {} {}",
                         this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale);
             } else {
-                boolean cancelStatus = this.aggreagteLocalUsagePeriodicTask.cancel(true);
+                boolean cancelStatus = this.aggregateLocalUsagePeriodicTask.cancel(true);
                 log.info("aggregateResourceGroupLocalUsages: Got status={} in cancel of periodic "
                                 + "when publish period changed from {} to {} {}",
                         cancelStatus, this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale);
             }
-            this.aggreagteLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
+            this.aggregateLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
                     catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
                     newPeriodInSeconds,
                     newPeriodInSeconds,
@@ -680,7 +695,7 @@ public class ResourceGroupService {
         ServiceConfiguration config = this.pulsar.getConfiguration();
         long periodInSecs = config.getResourceUsageTransportPublishIntervalInSecs();
         this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs;
-        this.aggreagteLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
+        this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
                     catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
                     periodInSecs,
                     periodInSecs,
@@ -737,7 +752,7 @@ public class ResourceGroupService {
 
 
     // The task that periodically re-calculates the quota budget for local usage.
-    private ScheduledFuture<?> aggreagteLocalUsagePeriodicTask;
+    private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
     private long aggregateLocalUsagePeriodInSeconds;
 
     // The task that periodically re-calculates the quota budget for local usage.
@@ -840,4 +855,14 @@ public class ResourceGroupService {
     ConcurrentHashMap getTopicProduceStats() {
         return this.topicProduceStats;
     }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
+        return this.aggregateLocalUsagePeriodicTask;
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
+        return this.calculateQuotaPeriodicTask;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index e0e3ec9c16a..86dff398f97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -257,6 +257,14 @@ public class ResourceGroupServiceTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(rgs.getNumResourceGroups(), 0);
     }
 
+    @Test
+    public void testClose() throws Exception {
+        ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null);
+        service.close();
+        Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled());
+        Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled());
+    }
+
     private ResourceGroupService rgs;
     int numAnonymousQuotaCalculations;