You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/22 05:04:48 UTC

(pulsar) branch branch-3.0 updated (ff8d3b73437 -> 94f12543a9f)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from ff8d3b73437 [improve][offload] Apply autoSkipNonRecoverableData configuration to tiered storage (#22531)
     new a9e815d464c [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)
     new 386f6f0bf73 [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)
     new b41e7527158 [fix][broker] Fix message drop record in producer stat (#22458)
     new eb1b55ec681 [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)
     new 42ae91ae31b [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
     new 94f12543a9f [improve] Make the config `metricsBufferResponse` description more effective (#22490)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 ++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../org/apache/pulsar/broker/service/Producer.java |  2 +-
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 +++++++++++++++
 .../client/api/SimpleProducerConsumerTest.java     | 66 +++++++++++++---------
 7 files changed, 94 insertions(+), 36 deletions(-)


(pulsar) 05/06: [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 42ae91ae31bc1ca27a13deb4b48a90ac06c89388
Author: sinan liu <li...@gmail.com>
AuthorDate: Tue Apr 16 21:19:44 2024 +0800

    [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
    
    (cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89)
---
 .../client/api/SimpleProducerConsumerTest.java     | 66 +++++++++++++---------
 1 file changed, 38 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e76d6d6962c..68158dd69a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
+        if (pulsarClient == null) {
+            pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        }
+
         final String topic = "persistent://my-property/my-ns/accessSchema";
         Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic)
@@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         producer.send(payload);
         producer.close();
 
-        GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
-        consumer.close();
-        assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
-        org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
-        JsonNode nativeJsonRecord = null;
-        if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
-            nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
-            assertNotNull(nativeAvroRecord);
-        } else {
-            nativeJsonRecord = (JsonNode) res.getNativeObject();
-            assertNotNull(nativeJsonRecord);
-        }
-        for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
-            log.info("field {} {}", f.getName(), res.getField(f));
-            assertEquals("field", f.getName());
-            assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
-
-            if (nativeAvroRecord != null) {
-                // test that the native schema is accessible
-                org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
-                // a nullable string is an UNION
-                assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
-                assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
-                assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
+        try {
+            GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
+            consumer.close();
+            assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
+            org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
+            JsonNode nativeJsonRecord = null;
+            if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+                nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
+                assertNotNull(nativeAvroRecord);
             } else {
-                assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
+                nativeJsonRecord = (JsonNode) res.getNativeObject();
+                assertNotNull(nativeJsonRecord);
+            }
+            for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
+                log.info("field {} {}", f.getName(), res.getField(f));
+                assertEquals("field", f.getName());
+                assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
+
+                if (nativeAvroRecord != null) {
+                    // test that the native schema is accessible
+                    org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
+                    // a nullable string is an UNION
+                    assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType());
+                    assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING));
+                    assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL));
+                } else {
+                    assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType());
+                }
             }
+            assertEquals(1, res.getFields().size());
+        } catch (Exception e) {
+            fail();
+        } finally {
+            pulsarClient.shutdown();
+            pulsarClient = null;
+            admin.schemas().deleteSchema(topic);
         }
-        assertEquals(1, res.getFields().size());
-
-        admin.schemas().deleteSchema(topic);
     }
 
     @Test(timeOut = 100000)


(pulsar) 04/06: [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb1b55ec6815375d751ff764ebe29455101a3b61
Author: Mukesh Kumar <65...@users.noreply.github.com>
AuthorDate: Fri Apr 12 22:07:28 2024 +0530

    [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)
    
    (cherry picked from commit b85730069ee4c5f96406a075e354d0592fdab434)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1363ca0945d..85fdc182ccc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2925,14 +2925,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
                     || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) {
                 if (backlogQuotaType == BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) {
-                    log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(),
+                    log.debug("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(),
                             producerName);
                     return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
                 }
                 if (backlogQuotaType == BacklogQuotaType.message_age) {
                     return checkTimeBacklogExceeded().thenCompose(isExceeded -> {
                         if (isExceeded) {
-                            log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
+                            log.debug("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
                                     producerName);
                             return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
                         } else {


(pulsar) 03/06: [fix][broker] Fix message drop record in producer stat (#22458)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b41e7527158da8e12dcd491dca4698c4a74d07ba
Author: zhangqian <50...@qq.com>
AuthorDate: Wed Apr 10 16:51:26 2024 +0800

    [fix][broker] Fix message drop record in producer stat (#22458)
    
    Co-authored-by: ceceezhang <ce...@tencent.com>
    (cherry picked from commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36)
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 53b79f06e8e..b077ae17a45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -741,7 +741,7 @@ public class Producer {
         }
         if (this.isNonPersistentTopic) {
             msgDrop.calculateRate();
-            ((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getRate();
+            ((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getValueRate();
         }
     }
 


(pulsar) 01/06: [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a9e815d464ca8d4620ba6d676d74831a730d533c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 4 06:39:53 2024 -0700

    [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)
    
    (cherry picked from commit 5b6f91bc0f839c467bdc1af35c8eac7b14aa8822)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 50e96b4d35f..0f561597f19 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,7 +228,7 @@ flexible messaging model and an intuitive client API.</description>
     <hppc.version>0.9.1</hppc.version>
     <spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version>
     <assertj-core.version>3.24.2</assertj-core.version>
-    <lombok.version>1.18.30</lombok.version>
+    <lombok.version>1.18.32</lombok.version>
     <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
     <jaxb-api>2.3.1</jaxb-api>
     <javax.activation.version>1.2.0</javax.activation.version>


(pulsar) 02/06: [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 386f6f0bf73d01f7b23160e9e4642f4f7c01cac5
Author: hanmz <gu...@tencent.com>
AuthorDate: Wed Apr 10 04:27:22 2024 +0800

    [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)
    
    (cherry picked from commit fb5caeb2cd3353db0499e32e9ec79390741b809c)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  3 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 ++++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c674a035653..ae6928d2b32 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1348,7 +1348,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
             category = CATEGORY_SERVER,
             dynamic = true,
             doc = "The number of partitions per partitioned topic.\n"
-                + "If try to create or update partitioned topics by exceeded number of partitions, then fail."
+                + "If try to create or update partitioned topics by exceeded number of partitions, then fail.\n"
+                + "Use 0 or negative number to disable the check."
     )
     private int maxNumPartitionsPerPartitionedTopic = 0;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 602cd47e595..dac3e7cd55d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -433,7 +433,7 @@ public class PersistentTopicsBase extends AdminResource {
                 }
                 int brokerMaximumPartitionsPerTopic = pulsarService.getConfiguration()
                         .getMaxNumPartitionsPerPartitionedTopic();
-                if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > brokerMaximumPartitionsPerTopic) {
+                if (brokerMaximumPartitionsPerTopic > 0 && expectPartitions > brokerMaximumPartitionsPerTopic) {
                     throw new RestException(422 /* Unprocessable entity*/,
                             String.format("Expect partitions %s grater than maximum partitions per topic %s",
                                     expectPartitions, brokerMaximumPartitionsPerTopic));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d7ffa656bdb..0eddbf1fea1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1658,6 +1658,51 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
         partitionedTopicMetadata = metaCaptor.getValue();
         Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+        // test for configuration maxNumPartitionsPerPartitionedTopic
+        conf.setMaxNumPartitionsPerPartitionedTopic(4);
+        response = mock(AsyncResponse.class);
+        throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 5);
+        verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture());
+        Assert.assertEquals(throwableCaptor.getValue().getMessage(),
+                "Desired partitions 5 can't be greater than the maximum partitions per topic 4.");
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+        conf.setMaxNumPartitionsPerPartitionedTopic(-1);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 5);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 5);
+
+        conf.setMaxNumPartitionsPerPartitionedTopic(0);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+                true, 6);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+
+        response = mock(AsyncResponse.class);
+        metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+        verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+        partitionedTopicMetadata = metaCaptor.getValue();
+        Assert.assertEquals(partitionedTopicMetadata.partitions, 6);
     }
 
     @Test


(pulsar) 06/06: [improve] Make the config `metricsBufferResponse` description more effective (#22490)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 94f12543a9ffe7e96f9af7ef9108d1b849ca3280
Author: 道君 <da...@apache.org>
AuthorDate: Wed Apr 17 03:07:30 2024 +0800

    [improve] Make the config `metricsBufferResponse` description more effective (#22490)
    
    (cherry picked from commit 4ca4e2855267e3b36ee1a27f7144b89ba9194821)
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ae6928d2b32..0dc5d38c191 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2852,8 +2852,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean exposeTopicLevelMetricsInPrometheus = true;
     @FieldContext(
             category = CATEGORY_METRICS,
-            doc = "If true, export buffered metrics"
-    )
+            doc = "Set to true to enable the broker to cache the metrics response; the default is false. "
+                    + "The caching period is defined by `managedLedgerStatsPeriodSeconds`. "
+                    + "The broker returns the same response for subsequent requests within the same period. "
+                    + "Ensure that the scrape interval of your monitoring system matches the caching period.")
     private boolean metricsBufferResponse = false;
     @FieldContext(
         category = CATEGORY_METRICS,