You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/22 05:38:49 UTC

(pulsar) branch branch-3.2 updated (14841768466 -> e7ff0678e56)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 14841768466 [improve][offload] Apply autoSkipNonRecoverableData configuration to tiered storage (#22531)
     new 49bcbb6c3da [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)
     new 72d290abd49 [fix][broker] Fix message drop record in producer stat (#22458)
     new 8d0a412eb93 [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)
     new bf89f080744 [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
     new f97a0bdb973 [improve] Make the config `metricsBufferResponse` description more effective (#22490)
     new 5d474657869 [improve][broker] Add topic name to emitted error messages. (#22506)
     new becbaf198c5 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)
     new 50293b75654 [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)
     new 91ce98d4eb6 [fix][broker] Fix typos in Consumer class (#22532)
     new e7ff0678e56 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/ShadowManagedLedgerImplTest.java  |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 ++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/service/AbstractTopic.java       | 69 +++++++++++++---------
 .../pulsar/broker/service/BrokerService.java       |  4 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  6 +-
 .../org/apache/pulsar/broker/service/Producer.java |  2 +-
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 ++++++++++++++
 ...kerInternalClientConfigurationOverrideTest.java | 42 ++++++++++++-
 .../client/api/SimpleProducerConsumerTest.java     | 66 ++++++++++++---------
 12 files changed, 185 insertions(+), 72 deletions(-)


(pulsar) 04/10: [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bf89f0807446f6e65d9db4cff24cce974e34eb01
Author: sinan liu <li...@gmail.com>
AuthorDate: Tue Apr 16 21:19:44 2024 +0800

    [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
    
    (cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89)
---
 .../client/api/SimpleProducerConsumerTest.java     | 66 +++++++++++++---------
 1 file changed, 38 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7552b84a1c5..691f501777e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
+        if (pulsarClient == null) {
+            pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        }
+
         final String topic = "persistent://my-property/my-ns/accessSchema";
         Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic)
@@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         producer.send(payload);
         producer.close();
 
-        GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
-        consumer.close();
-        assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
-        org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
-        JsonNode nativeJsonRecord = null;
-        if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
-            nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
-            assertNotNull(nativeAvroRecord);
-        } else {
-            nativeJsonRecord = (JsonNode) res.getNativeObject();
-            assertNotNull(nativeJsonRecord);
-        }
-        for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
-            log.info("field {} {}", f.getName(), res.getField(f));
-            assertEquals("field", f.getName());
-            assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
-
-            if (nativeAvroRecord != null) {
-                // test that the native schema is accessible
-                org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
-                // a nullable string is an UNION
-                assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
-                assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
-                assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
+        try {
+            GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
+            consumer.close();
+            assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
+            org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
+            JsonNode nativeJsonRecord = null;
+            if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+                nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
+                assertNotNull(nativeAvroRecord);
             } else {
-                assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
+                nativeJsonRecord = (JsonNode) res.getNativeObject();
+                assertNotNull(nativeJsonRecord);
+            }
+            for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
+                log.info("field {} {}", f.getName(), res.getField(f));
+                assertEquals("field", f.getName());
+                assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
+
+                if (nativeAvroRecord != null) {
+                    // test that the native schema is accessible
+                    org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
+                    // a nullable string is an UNION
+                    assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
+                    assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
+                    assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
+                } else {
+                    assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
+                }
             }
+            assertEquals(1, res.getFields().size());
+        } catch (Exception e) {
+            fail();
+        } finally {
+            pulsarClient.shutdown();
+            pulsarClient = null;
+            admin.schemas().deleteSchema(topic);
         }
-        assertEquals(1, res.getFields().size());
-
-        admin.schemas().deleteSchema(topic);
     }
 
     @Test(timeOut = 100000)


(pulsar) 08/10: [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 50293b7565483f5b074ca43d9b6df0836d10cb0e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 17 12:46:43 2024 -0700

    [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)
    
    (cherry picked from commit 56970b714f5adb606b02d12a99db1ceec3fa7832)
---
 .../org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
index cc4b3f24811..2aa04197ab9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
@@ -51,7 +51,7 @@ public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase {
         return (ShadowManagedLedgerImpl) shadowML;
     }
 
-    @Test
+    @Test(groups = "flaky")
     public void testShadowWrites() throws Exception {
         ManagedLedgerImpl sourceML = (ManagedLedgerImpl) factory.open("source_ML", new ManagedLedgerConfig()
                 .setMaxEntriesPerLedger(2)


(pulsar) 07/10: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit becbaf198c5d399375c6149d3493391c382232e0
Author: hanmz <gu...@tencent.com>
AuthorDate: Wed Apr 17 18:14:38 2024 +0800

    [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)
    
    (cherry picked from commit 1dd82a0affd6ec3686fa85d444c35bbbb4e9ce12)
---
 .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f19b3436f7b..dc66750fd96 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -336,7 +336,9 @@ public class BrokerService implements Closeable {
         this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration());
 
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
-        pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+        if (pulsar.getConfigurationMetadataStore() != pulsar.getLocalMetadataStore()) {
+            pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+        }
 
 
         this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()


(pulsar) 01/10: [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49bcbb6c3da2246bd887605fd68c10e1d3e9af28
Author: hanmz <gu...@tencent.com>
AuthorDate: Wed Apr 10 04:27:22 2024 +0800

    [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)
    
    (cherry picked from commit fb5caeb2cd3353db0499e32e9ec79390741b809c)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  3 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 ++++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8b99501f296..2e7d4d582d6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1351,7 +1351,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
             category = CATEGORY_SERVER,
             dynamic = true,
             doc = "The number of partitions per partitioned topic.\n"
-                + "If try to create or update partitioned topics by exceeded number of partitions, then fail."
+                + "If try to create or update partitioned topics by exceeded number of partitions, then fail.\n"
+                + "Use 0 or negative number to disable the check."
     )
     private int maxNumPartitionsPerPartitionedTopic = 0;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 318e2bc2cde..b0968f494ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -348,7 +348,7 @@ public class PersistentTopicsBase extends AdminResource {
                 }
                 int brokerMaximumPartitionsPerTopic = pulsarService.getConfiguration()
                         .getMaxNumPartitionsPerPartitionedTopic();
-                if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > brokerMaximumPartitionsPerTopic) {
+                if (brokerMaximumPartitionsPerTopic > 0 && expectPartitions > brokerMaximumPartitionsPerTopic) {
                     throw new RestException(422 /* Unprocessable entity*/,
                             String.format("Desired partitions %s can't be greater than the maximum partitions per"
                                             + " topic %s.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 8e1375303ce..c588051a0fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1742,6 +1742,51 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
         partitionedTopicMetadata = metaCaptor.getValue();
         Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+        // test for configuration maxNumPartitionsPerPartitionedTopic
+        conf.setMaxNumPartitionsPerPartitionedTopic(4);
+        response = mock(AsyncResponse.class);
+        throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 5);
+        verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture());
+        Assert.assertEquals(throwableCaptor.getValue().getMessage(),
+                "Desired partitions 5 can't be greater than the maximum partitions per topic 4.");
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+        conf.setMaxNumPartitionsPerPartitionedTopic(-1);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 5);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 5);
+
+        conf.setMaxNumPartitionsPerPartitionedTopic(0);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 6);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 6);
     }
 
     @Test


(pulsar) 03/10: [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d0a412eb93ed11dc3fd8cf12ac1d69d1ef25fc5
Author: Mukesh Kumar <65...@users.noreply.github.com>
AuthorDate: Fri Apr 12 22:07:28 2024 +0530

    [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)
    
    (cherry picked from commit b85730069ee4c5f96406a075e354d0592fdab434)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 29df6e78cd1..a76ac548e0c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3150,14 +3150,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
                     || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) {
                 if (backlogQuotaType == BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) {
-                    log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(),
+                    log.debug("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(),
                             producerName);
                     return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
                 }
                 if (backlogQuotaType == BacklogQuotaType.message_age) {
                     return checkTimeBacklogExceeded().thenCompose(isExceeded -> {
                         if (isExceeded) {
-                            log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
+                            log.debug("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
                                     producerName);
                             return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
                         } else {


(pulsar) 05/10: [improve] Make the config `metricsBufferResponse` description more effective (#22490)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f97a0bdb973a1ed4dd13986e84a1ba55459dd4fe
Author: 道君 <da...@apache.org>
AuthorDate: Wed Apr 17 03:07:30 2024 +0800

    [improve] Make the config `metricsBufferResponse` description more effective (#22490)
    
    (cherry picked from commit 4ca4e2855267e3b36ee1a27f7144b89ba9194821)
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2e7d4d582d6..0fce252694f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2908,8 +2908,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean exposeTopicLevelMetricsInPrometheus = true;
     @FieldContext(
             category = CATEGORY_METRICS,
-            doc = "If true, export buffered metrics"
-    )
+            doc = "Set to true to enable the broker to cache the metrics response; the default is false. "
+                    + "The caching period is defined by `managedLedgerStatsPeriodSeconds`. "
+                    + "The broker returns the same response for subsequent requests within the same period. "
+                    + "Ensure that the scrape interval of your monitoring system matches the caching period.")
     private boolean metricsBufferResponse = false;
     @FieldContext(
         category = CATEGORY_METRICS,


(pulsar) 09/10: [fix][broker] Fix typos in Consumer class (#22532)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91ce98d4eb63978dc7a4924fc92d327da9dffc67
Author: hanmz <gu...@tencent.com>
AuthorDate: Fri Apr 19 06:49:18 2024 +0800

    [fix][broker] Fix typos in Consumer class (#22532)
    
    (cherry picked from commit 7aedb6b20c120ec0a7cc096e33e6305caca26786)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4cd54420200..6b2028095e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -147,7 +147,7 @@ public class Consumer {
     @Setter
     private volatile long consumerEpoch;
 
-    private long negtiveUnackedMsgsTimestamp;
+    private long negativeUnackedMsgsTimestamp;
 
     @Getter
     private final SchemaType schemaType;
@@ -1102,8 +1102,8 @@ public class Consumer {
             subscription.addUnAckedMessages(ackedMessages);
             unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
         }
-        if (unackedMsgs < 0 && System.currentTimeMillis() - negtiveUnackedMsgsTimestamp >= 10_000) {
-            negtiveUnackedMsgsTimestamp = System.currentTimeMillis();
+        if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) {
+            negativeUnackedMsgsTimestamp = System.currentTimeMillis();
             log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer);
         }
         return unackedMsgs;


(pulsar) 06/10: [improve][broker] Add topic name to emitted error messages. (#22506)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d4746578695f828e0901cb14fd1e83837fe4b1a
Author: 道君 <da...@apache.org>
AuthorDate: Wed Apr 17 03:12:34 2024 +0800

    [improve][broker] Add topic name to emitted error messages. (#22506)
    
    (cherry picked from commit d5b36da9a2e0d4f17bea8e033180e494e93dc442)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java | 17 +++++++++--------
 .../org/apache/pulsar/broker/admin/AdminApi2Test.java   |  6 ++++--
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 837f073b00d..39c3cbf13b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -855,7 +855,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
             }
 
         } catch (Exception e) {
-            log.error("Encountered unexpected error during exclusive producer creation", e);
+            log.error("[{}] Encountered unexpected error during exclusive producer creation", topic, e);
             return FutureUtil.failedFuture(new BrokerServiceException(e));
         } finally {
             lock.writeLock().unlock();
@@ -929,14 +929,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     protected CompletableFuture<Void> internalAddProducer(Producer producer) {
         if (isProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
-            return CompletableFuture.failedFuture(
-                    new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
+            return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
+                    "Topic '" + topic + "' reached max producers limit"));
         }
 
         if (isSameAddressProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
-            return CompletableFuture.failedFuture(
-                    new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
+            return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
+                    "Topic '" + topic + "' reached max same address producers limit"));
         }
 
         if (log.isDebugEnabled()) {
@@ -971,7 +971,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
                     if (previousIsActive) {
                         return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
                                 "Producer with name '" + newProducer.getProducerName()
-                                        + "' is already connected to topic"));
+                                        + "' is already connected to topic '" + topic + "'"));
                     } else {
                         // If the connection of the previous producer is not active, the method
                         // "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous
@@ -984,7 +984,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
                 });
             }
             return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
-                    "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
+                    "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic '"
+                            + topic + "'"));
         }
     }
 
@@ -1329,7 +1330,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
             return getMigratedClusterUrlAsync(pulsar, topic)
                     .get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
         } catch (Exception e) {
-            log.warn("Failed to get migration cluster URL", e);
+            log.warn("[{}] Failed to get migration cluster URL", topic, e);
         }
         return Optional.empty();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 38993388704..a6849793137 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2887,7 +2887,8 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
             Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
             fail("should fail");
         } catch (PulsarClientException e) {
-            assertTrue(e.getMessage().contains("Topic reached max producers limit"));
+            String expectMsg = "Topic '" + topic + "' reached max producers limit";
+            assertTrue(e.getMessage().contains(expectMsg));
         }
         //set the limit to 3
         admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
@@ -2901,7 +2902,8 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
             Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
             fail("should fail");
         } catch (PulsarClientException e) {
-            assertTrue(e.getMessage().contains("Topic reached max producers limit"));
+            String expectMsg = "Topic '" + topic + "' reached max producers limit";
+            assertTrue(e.getMessage().contains(expectMsg));
         }
 
         //clean up


(pulsar) 10/10: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e7ff0678e56f605a8fb2344c7162f5dcd7e31072
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 19 10:30:55 2024 -0700

    [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
    
    (cherry picked from commit 59daac64c210f539e733f883edad09d08333aa62)
---
 .../pulsar/broker/service/AbstractTopic.java       | 52 +++++++++++++---------
 ...kerInternalClientConfigurationOverrideTest.java | 42 ++++++++++++++++-
 2 files changed, 72 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 39c3cbf13b2..6ddcfc9ef4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -218,13 +218,16 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
                     .updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
         }
         topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
-        topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
-        topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
+        topicPolicies.getMaxSubscriptionsPerTopic()
+                .updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()));
+        topicPolicies.getMaxUnackedMessagesOnConsumer()
+                .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()));
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                .updateTopicValue(data.getMaxUnackedMessagesOnSubscription());
-        topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
-        topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
-        topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
+                .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()));
+        topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()));
+        topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()));
+        topicPolicies.getMaxConsumersPerSubscription()
+                .updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()));
         topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
         topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
         topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue(
@@ -235,8 +238,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
                 this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
                         data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
-        topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
-        topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+        topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+        topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
         topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
         topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -263,15 +266,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getReplicationClusters().updateNamespaceValue(
                 new ArrayList<>(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
         topicPolicies.getMaxUnackedMessagesOnConsumer()
-                .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
-        topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
-        topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
-        topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
-        topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+        topicPolicies.getMessageTTLInSeconds()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+        topicPolicies.getMaxSubscriptionsPerTopic()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+        topicPolicies.getMaxProducersPerTopic()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+        topicPolicies.getMaxConsumerPerTopic()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
         topicPolicies.getMaxConsumersPerSubscription()
-                .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
         topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
         topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
         topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -301,6 +308,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         updateEntryFilters();
     }
 
+    private Integer normalizeValue(Integer policyValue) {
+        return policyValue != null && policyValue < 0 ? null : policyValue;
+    }
+
     private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
         DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
         if (dispatchRate == null) {
@@ -359,12 +370,11 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
         topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
         topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
-        topicPolicies.getRetentionPolicies().updateBrokerValue(new RetentionPolicies(
-                config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
-        topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
-                config.getBrokerDeduplicationSnapshotIntervalSeconds());
-        topicPolicies.getMaxUnackedMessagesOnConsumer()
-                .updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
+        topicPolicies.getRetentionPolicies().updateBrokerValue(
+                new RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
+        topicPolicies.getDeduplicationSnapshotIntervalSeconds()
+                .updateBrokerValue(config.getBrokerDeduplicationSnapshotIntervalSeconds());
+        topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
         topicPolicies.getMaxUnackedMessagesOnSubscription()
                 .updateBrokerValue(config.getMaxUnackedMessagesPerSubscription());
         //init backlogQuota
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
index 1b1b383e930..f33202c3c40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
+import lombok.Cleanup;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -112,4 +116,40 @@ public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBas
         Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
     }
 
+    @Test
+    public void testOldNamespacePolicy() throws Exception {
+        
+        String ns = "prop/oldNsWithDefaultNonNullValues";
+        String topic = "persistent://" + ns + "/t1";
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = -1;
+        policies.max_consumers_per_topic = -1;
+        policies.max_producers_per_topic = -1;
+        policies.max_subscriptions_per_topic = -1;
+        policies.max_topics_per_namespace = -1;
+        policies.max_unacked_messages_per_consumer = -1;
+        policies.max_unacked_messages_per_subscription = -1;
+        admin.namespaces().createNamespace(ns, policies);
+        
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic).create();
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+        assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
+                conf.getMaxUnackedMessagesPerSubscription());
+        assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(),
+                conf.getMaxConsumersPerSubscription());
+        assertEquals(topicRef.topicPolicies.getMaxConsumerPerTopic().get(),
+                conf.getMaxConsumersPerTopic());
+        assertEquals(topicRef.topicPolicies.getMaxProducersPerTopic().get(),
+                conf.getMaxProducersPerTopic());
+        assertEquals(topicRef.topicPolicies.getMaxSubscriptionsPerTopic().get(),
+                conf.getMaxSubscriptionsPerTopic());
+        assertEquals(topicRef.topicPolicies.getTopicMaxMessageSize().get(),
+                conf.getMaxMessageSize());
+        assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnConsumer().get(),
+                conf.getMaxUnackedMessagesPerConsumer());
+        
+        
+    }
 }


(pulsar) 02/10: [fix][broker] Fix message drop record in producer stat (#22458)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72d290abd49743bfb94173e831b907e5780b180f
Author: zhangqian <50...@qq.com>
AuthorDate: Wed Apr 10 16:51:26 2024 +0800

    [fix][broker] Fix message drop record in producer stat (#22458)
    
    Co-authored-by: ceceezhang <ce...@tencent.com>
    (cherry picked from commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36)
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 7e4459505a5..9cfde67802b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -749,7 +749,7 @@ public class Producer {
         }
         if (this.isNonPersistentTopic) {
             msgDrop.calculateRate();
-            ((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getRate();
+            ((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getValueRate();
         }
     }