You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 12:33:28 UTC

[pulsar] branch branch-2.7 updated (2f9f639 -> e9e415f)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 2f9f639  [Issue 8787][C++] Add reader internal subscription name setter. (#8823)
     new 636a749  add pulsar-perf new feature: one subscription has more than one consumers (#8837)
     new e9e415f  Issue 8882: GenericJsonReader converts the null value to string "null" (#8883)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impl/schema/generic/GenericJsonRecord.java     |  2 ++
 .../impl/schema/generic/GenericJsonRecordTest.java | 13 ++++++++++
 .../pulsar/testclient/PerformanceConsumer.java     | 29 ++++++++++++++++------
 3 files changed, 37 insertions(+), 7 deletions(-)


[pulsar] 02/02: Issue 8882: GenericJsonReader converts the null value to string "null" (#8883)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e9e415fe7cd1dcb0c00ff29eb0de5a9413041d06
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Dec 10 06:28:51 2020 +0100

    Issue 8882: GenericJsonReader converts the null value to string "null" (#8883)
    
    Describe the bug
    It looks like GenericJsonReader is not handling correctly null values
    
    Expected behavior
    The null value is not converted to a string, but it is still a null value
    
    Changes
    - Handle correctly null values
    - add test case
    
    Additional context
    The problem affects Pulsar Functions/Pulsar IO
    
    Fixes #8882
    
    (cherry picked from commit d9f3710006749c73471a80e1150a2d2d61c154f8)
---
 .../client/impl/schema/generic/GenericJsonRecord.java       |  2 ++
 .../client/impl/schema/generic/GenericJsonRecordTest.java   | 13 +++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
index 708b10e..85d3e8e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
@@ -90,6 +90,8 @@ public class GenericJsonRecord extends VersionedGenericRecord {
             } catch (IOException e) {
                 return fn.asText();
             }
+        } else if (fn.isNull()) {
+            return null;
         } else {
             return fn.asText();
         }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java
index 0de3e3e..6486a24 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecordTest.java
@@ -25,13 +25,26 @@ import org.testng.annotations.Test;
 import java.util.Collections;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import org.apache.pulsar.client.api.schema.Field;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 
 public class GenericJsonRecordTest {
 
     @Test
+    public void decodeNullValue() throws Exception{
+        byte[] json = "{\"somefield\":null}".getBytes(UTF_8);
+        GenericJsonRecord record
+                = new GenericJsonReader(Collections.singletonList(new Field("somefield", 0)))
+                        .read(json, 0, json.length);
+        assertTrue(record.getJsonNode().get("somefield").isNull());
+        assertNull(record.getField("somefield"));
+    }
+
+
+    @Test
     public void decodeLongField() throws Exception{
         String jsonStr = "{\"timestamp\":1585204833128, \"count\":2, \"value\": 1.1, \"on\":true}";
         byte[] jsonStrBytes = jsonStr.getBytes();


[pulsar] 01/02: add pulsar-perf new feature: one subscription has more than one consumers (#8837)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 636a749ace9cce7888c36764c493f2f39f494504
Author: Gjiangtao <31...@users.noreply.github.com>
AuthorDate: Mon Dec 7 16:27:59 2020 +0800

    add pulsar-perf new feature: one subscription has more than one consumers (#8837)
    
    ### Motivation
    
    
    *The official performance test tool pulsar-perf now supports only one consumer per subscription, not multiple consumers per subscription.*
    
    ### Modifications
    
    - A new parameter `numSubscriptions` was added, which specifies the number of subscriptions per Topic
    - Change the definition of `numConsumers`: before: number of consumers per topic; After: the number of consumers per subscription
    - add new feature: one subscription has more than one consumers
    - add new parameter: `receiver-queue-size-across-partitions`, which means *Max total size of the receiver queue across partitions*
    
    (cherry picked from commit 78c8e3237f3e063cd61c1e22a0a56916e650851a)
---
 .../pulsar/testclient/PerformanceConsumer.java     | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 552b146..606262e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -81,9 +81,12 @@ public class PerformanceConsumer {
         @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
         public int numTopics = 1;
 
-        @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per topic)")
+        @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive")
         public int numConsumers = 1;
 
+        @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
+        public int numSubscriptions = 1;
+
         @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
         public String subscriberName = "sub";
 
@@ -99,6 +102,9 @@ public class PerformanceConsumer {
         @Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue")
         public int receiverQueueSize = 1000;
 
+        @Parameter(names = { "-p", "--receiver-queue-size-across-partitions" }, description = "Max total size of the receiver queue across partitions")
+        public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
         @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
         public boolean replicatedSubscription = false;
 
@@ -188,6 +194,12 @@ public class PerformanceConsumer {
             System.exit(-1);
         }
 
+        if (arguments.subscriptionType == SubscriptionType.Exclusive && arguments.numConsumers > 1) {
+            System.out.println("Only one consumer is allowed when subscriptionType is Exclusive");
+            jc.usage();
+            System.exit(-1);
+        }
+
         if (arguments.confFile != null) {
             Properties prop = new Properties(System.getProperties());
             prop.load(new FileInputStream(arguments.confFile));
@@ -306,6 +318,7 @@ public class PerformanceConsumer {
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
                 .messageListener(listener) //
                 .receiverQueueSize(arguments.receiverQueueSize) //
+                .maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions)
                 .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
                 .subscriptionType(arguments.subscriptionType)
                 .subscriptionInitialPosition(arguments.subscriptionInitialPosition)
@@ -328,18 +341,20 @@ public class PerformanceConsumer {
         for (int i = 0; i < arguments.numTopics; i++) {
             final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
                     : TopicName.get(String.format("%s-%d", prefixTopicName, i));
-            log.info("Adding {} consumers on topic {}", arguments.numConsumers, topicName);
+            log.info("Adding {} consumers per subscription on topic {}", arguments.numConsumers, topicName);
 
-            for (int j = 0; j < arguments.numConsumers; j++) {
+            for (int j = 0; j < arguments.numSubscriptions; j++) {
                 String subscriberName;
-                if (arguments.numConsumers > 1) {
+                if (arguments.numSubscriptions > 1) {
                     subscriberName = String.format("%s-%d", arguments.subscriberName, j);
                 } else {
                     subscriberName = arguments.subscriberName;
                 }
 
-                futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
-                        .subscribeAsync());
+                for (int k = 0; k < arguments.numConsumers; k++) {
+                    futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
+                            .subscribeAsync());
+                }
             }
         }
 
@@ -347,7 +362,7 @@ public class PerformanceConsumer {
             future.get();
         }
 
-        log.info("Start receiving from {} consumers on {} topics", arguments.numConsumers,
+        log.info("Start receiving from {} consumers per subscription on {} topics", arguments.numConsumers,
                 arguments.numTopics);
 
         long start = System.nanoTime();