You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/04/17 01:30:36 UTC

[pulsar] branch master updated: PIP-82 incorporate review feedback (#10201)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc782f  PIP-82 incorporate review feedback (#10201)
1cc782f is described below

commit 1cc782f88b8934e1c636d45d9f6fc25311ebe260
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Fri Apr 16 18:29:27 2021 -0700

    PIP-82 incorporate review feedback (#10201)
---
 pom.xml                                            |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ---
 pulsar-broker/pom.xml                              |  2 +
 .../org/apache/pulsar/broker/PulsarService.java    |  5 +++
 .../ResourceUsageTransportManager.java             | 44 ++++++++++++++++++----
 .../ResourceUsageTransportManagerTest.java         |  5 +--
 6 files changed, 45 insertions(+), 19 deletions(-)

diff --git a/pom.xml b/pom.xml
index d7b5b72..209dcc1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,7 +235,7 @@ flexible messaging model and an intuitive client API.</description>
     <errorprone.version>2.5.1</errorprone.version>
     <errorprone.javac.version>9+181-r4173-1</errorprone.javac.version>
     <errorprone-slf4j.version>0.1.4</errorprone-slf4j.version>
-    <lightproto-maven-plugin.version>0.2</lightproto-maven-plugin.version>
+    <lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
 
     <!-- Used to configure rename.netty.native. Libs -->
     <rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 856f826..358345c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -688,12 +688,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private String resourceUsageTransportClassName = "";
 
     @FieldContext(
-            category = CATEGORY_POLICIES,
-            doc = "Topic to publish usage reports to if resourceUsagePublishToTopic is enabled."
-    )
-    private String resourceUsageTransportPublishTopicName = "non-persistent://pulsar/system/resource-usage";
-
-    @FieldContext(
             dynamic = true,
             category = CATEGORY_POLICIES,
             doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled."
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 49609c2..cb13048 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -491,6 +491,8 @@
         <version>${lightproto-maven-plugin.version}</version>
         <configuration>
           <sources>${project.basedir}/src/main/proto/ResourceUsage.proto</sources>
+          <targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
+          <targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
         </configuration>
         <executions>
           <execution>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 17bb041..8db0b1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -340,6 +340,11 @@ public class PulsarService implements AutoCloseable {
             state = State.Closing;
 
             // close the service in reverse order v.s. in which they are started
+            if (this.resourceUsageTransportManager != null) {
+                this.resourceUsageTransportManager.close();
+                this.resourceUsageTransportManager = null;
+            }
+
             if (this.webService != null) {
                 try {
                     this.webService.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
index f7064e4..227bce0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
@@ -66,7 +66,7 @@ public class ResourceUsageTransportManager implements AutoCloseable {
             final int sendTimeoutSecs = 10;
 
             return pulsarClient.newProducer()
-                    .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
                     .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
                     .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
                     .blockIfQueueFull(false)
@@ -113,11 +113,12 @@ public class ResourceUsageTransportManager implements AutoCloseable {
 
     private class ResourceUsageReader implements ReaderListener<byte[]>, AutoCloseable {
         private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
+
         private final Reader<byte[]> consumer;
 
         public ResourceUsageReader() throws PulsarClientException {
             consumer =  pulsarClient.newReader()
-                    .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
                     .startMessageId(MessageId.latest)
                     .readerListener(this)
                     .create();
@@ -130,9 +131,19 @@ public class ResourceUsageTransportManager implements AutoCloseable {
 
         @Override
         public void received(Reader<byte[]> reader, Message<byte[]> msg) {
-            try {
-                recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
+            long publishTime = msg.getPublishTime();
+            long currentTime = System.currentTimeMillis();
+            long timeDelta = currentTime - publishTime;
 
+            recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
+            if (timeDelta > TimeUnit.SECONDS.toMillis(
+              2 * pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs())) {
+                LOG.error("Stale resource usage msg from broker {} publish time {} current time{}",
+                  recdUsageInfo.getBroker(), publishTime, currentTime);
+                staleMessageCount++;
+                return;
+            }
+            try {
                 recdUsageInfo.getUsageMapsList().forEach(ru -> {
                     ResourceUsageConsumer owner = consumerMap.get(ru.getOwner());
                     if (owner != null) {
@@ -150,6 +161,7 @@ public class ResourceUsageTransportManager implements AutoCloseable {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTransportManager.class);
+    public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
     private final PulsarService pulsarService;
     private final PulsarClient pulsarClient;
     private final ResourceUsageWriterTask pTask;
@@ -159,9 +171,11 @@ public class ResourceUsageTransportManager implements AutoCloseable {
     private final Map<String, ResourceUsageConsumer>
             consumerMap = new ConcurrentHashMap<String, ResourceUsageConsumer>();
 
+    private long staleMessageCount = 0;
+
     private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
         // Create a public tenant and default namespace
-        TopicName topicName = TopicName.get(pulsarService.getConfig().getResourceUsageTransportPublishTopicName());
+        TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
 
         PulsarAdmin admin = pulsarService.getAdminClient();
         ServiceConfiguration config = pulsarService.getConfig();
@@ -172,12 +186,26 @@ public class ResourceUsageTransportManager implements AutoCloseable {
 
         List<String> tenantList =  admin.tenants().getTenants();
         if (!tenantList.contains(tenant)) {
-            admin.tenants().createTenant(tenant,
-                    new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
+            try {
+                admin.tenants().createTenant(tenant,
+                  new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
+            } catch (PulsarAdminException ex1) {
+                if (!(ex1 instanceof PulsarAdminException.ConflictException)) {
+                    LOG.error("Unexpected exception {} when creating tenant {}", ex1, tenant);
+                    throw ex1;
+                }
+            }
         }
         List<String> nsList = admin.namespaces().getNamespaces(tenant);
         if (!nsList.contains(namespace)) {
-            admin.namespaces().createNamespace(namespace);
+            try {
+                admin.namespaces().createNamespace(namespace);
+            } catch (PulsarAdminException ex1) {
+                if (!(ex1 instanceof PulsarAdminException.ConflictException)) {
+                    LOG.error("Unexpected exception {} when creating namespace {}", ex1, namespace);
+                    throw ex1;
+                }
+            }
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
index 4c1f186..89246d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.assertNotNull;
 
 public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTest {
 
-    private static final String INTERNAL_TOPIC = "non-persistent://pulsar-test/test/resource-usage";
     private static final int PUBLISH_INTERVAL_SECS = 1;
 
     @BeforeClass
@@ -53,11 +52,10 @@ public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTe
     @Test
     public void testNamespaceCreation() throws Exception {
         ResourceUsageTransportManager tManager = new ResourceUsageTransportManager(pulsar);
-        TopicName topicName = TopicName.get(INTERNAL_TOPIC);
+        TopicName topicName = TopicName.get(ResourceUsageTransportManager.RESOURCE_USAGE_TOPIC_NAME);
 
         assertTrue(admin.tenants().getTenants().contains(topicName.getTenant()));
         assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace()));
-
     }
     
     @Test
@@ -116,7 +114,6 @@ public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTe
 
     private void prepareData() throws PulsarAdminException {
         this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager");
-        this.conf.setResourceUsageTransportPublishTopicName(INTERNAL_TOPIC);
         this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
         admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
     }