You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/29 10:40:30 UTC

[pulsar] branch branch-2.8 updated (e5a5498 -> 9ec4f90)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from e5a5498  [Python Schema] Support setting namespace for python schema (#12175)
     new cd2dd06  [pulsar-client] Make it possible to disable poolMessages (#12108)
     new 90c3804  Fix deadLetterPolicy is not working with key shared subscription under partitioned topic (#12148)
     new e0d3d421 Remove the deprecated api usage in hdfs (#12080)
     new 519dbe1  [testclient] Call printAggregatedStats method before client exit (#11985)
     new 2fe6cba  [testclient] Add total messages when printing throughput (#12084)
     new 9f39cc6  Fix returned wrong hash ranges for the consumer with same consumer name (#12212)
     new f2cc522  Disable stats recorder for built-in PulsarClient (#12217)
     new 9ec4f90  [Python] Do not sort schema fields by default (#12232)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    |  2 +
 ...ConsistentHashingStickyKeyConsumerSelector.java |  9 +-
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |  9 +-
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |  9 +-
 .../broker/service/StickyKeyConsumerSelector.java  |  3 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  3 +-
 .../service/persistent/PersistentSubscription.java | 10 ++-
 ...istentHashingStickyKeyConsumerSelectorTest.java | 29 +++++--
 ...angeAutoSplitStickyKeyConsumerSelectorTest.java | 40 +++++++--
 ...angeExclusiveStickyKeyConsumerSelectorTest.java | 48 +++++++++--
 .../pulsar/client/api/DeadLetterTopicTest.java     | 96 ++++++++++++++++++++++
 .../java/org/apache/pulsar/client/api/Range.java   | 19 +++++
 .../python/pulsar/schema/definition.py             | 19 +++--
 pulsar-client-cpp/python/schema_test.py            | 38 +++++++++
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  2 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 +-
 .../pulsar/io/hdfs2/AbstractHdfsConnector.java     |  7 +-
 .../io/hdfs2/sink/seq/HdfsSequentialTextSink.java  |  2 +-
 .../pulsar/io/hdfs3/AbstractHdfsConnector.java     |  7 +-
 .../io/hdfs3/sink/seq/HdfsSequentialTextSink.java  |  2 +-
 .../proxy/socket/client/PerformanceClient.java     | 40 ++++++++-
 .../pulsar/testclient/ManagedLedgerWriter.java     | 34 ++++++--
 .../pulsar/testclient/PerformanceReader.java       |  6 +-
 23 files changed, 366 insertions(+), 71 deletions(-)

[pulsar] 02/08: Fix deadLetterPolicy is not working with key shared subscription under partitioned topic (#12148)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 90c3804ac5781e670e61353b02097d117cebacb3
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 24 21:13:42 2021 +0800

    Fix deadLetterPolicy is not working with key shared subscription under partitioned topic (#12148)
    
    Fixes #11652 .
    
    This is a bug fix, no need to update doc.
    
    (cherry picked from commit 7d4d8cc417105664eae66521e41e8f27cbbd5c87)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 96 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 +-
 2 files changed, 98 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 7a65c16..645c767 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -519,4 +519,100 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
         assertNotNull(msg);
     }
+
+    @Test
+    public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 1;
+
+        int partitionCount = 2;
+
+        admin.topics().createPartitionedTopic(topic, partitionCount);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer0 = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer1 = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer0.receive(3, TimeUnit.SECONDS);
+            if (message != null) {
+                log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+                deadLetterConsumer0.acknowledge(message);
+                totalInDeadLetter++;
+            } else {
+                break;
+            }
+        } while (totalInDeadLetter < sendMessages);
+
+        do {
+            Message message = deadLetterConsumer1.receive(3, TimeUnit.SECONDS);
+            if (message != null) {
+                log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+                deadLetterConsumer1.acknowledge(message);
+                totalInDeadLetter++;
+            } else {
+                break;
+            }
+        } while (totalInDeadLetter < sendMessages);
+
+        assertEquals(totalInDeadLetter, sendMessages);
+        deadLetterConsumer0.close();
+        deadLetterConsumer1.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3b06ffa..f31e6a5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -649,7 +649,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
         checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl);
 
-        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+        if (conf.getSubscriptionType() != SubscriptionType.Shared
+                && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
             // We cannot redeliver single messages if subscription type is not Shared
             redeliverUnacknowledgedMessages();
             return;

[pulsar] 04/08: [testclient] Call printAggregatedStats method before client exit (#11985)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 519dbe1eacd09ab98825920bbe5f7c6cf27e2c1a
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Sep 20 20:34:06 2021 +0800

    [testclient] Call printAggregatedStats method before client exit (#11985)
    
    
    (cherry picked from commit 3c770a18a807498634124161f95bc4f0888d5315)
---
 .../proxy/socket/client/PerformanceClient.java     | 32 ++++++++++++++++++++++
 .../pulsar/testclient/ManagedLedgerWriter.java     | 26 ++++++++++++++----
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 3902d5f..0e6f84b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -67,6 +67,8 @@ public class PerformanceClient {
     static AtomicInteger msgSent = new AtomicInteger(0);
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
     private JCommander jc;
 
     @Parameters(commandDescription = "Test pulsar websocket producer performance.")
@@ -263,6 +265,8 @@ public class PerformanceClient {
                         producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage);
                         messagesSent.increment();
                         bytesSent.add(sizeOfMessage);
+                        totalMessagesSent.increment();
+                        totalBytesSent.add(sizeOfMessage);
                     }
                 }
 
@@ -328,6 +332,11 @@ public class PerformanceClient {
         PerformanceClient test = new PerformanceClient();
         Arguments arguments = test.loadArguments(args);
         PerfClientUtils.printJVMInformation(log);
+        long start = System.nanoTime();
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
         test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize,
                 arguments.proxyURL, arguments.topics.get(0), arguments.authPluginClassName, arguments.authParams);
     }
@@ -350,8 +359,31 @@ public class PerformanceClient {
 
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+                "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+                totalMessagesSent,
+                totalFormat.format(rate),
+                totalFormat.format(throughput));
+    }
+
+    private static void printAggregatedStats() {
+        Histogram reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram();
+
+        log.info(
+                "Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean()), reportHistogram.getValueAtPercentile(50),
+                reportHistogram.getValueAtPercentile(95), reportHistogram.getValueAtPercentile(99),
+                reportHistogram.getValueAtPercentile(99.9), reportHistogram.getValueAtPercentile(99.99),
+                reportHistogram.getValueAtPercentile(99.999), reportHistogram.getMaxValue());
+    }
+
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceClient.class);
 
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 8d4c8c2..244be81 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -72,6 +72,8 @@ public class ManagedLedgerWriter {
 
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
 
     private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
     private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
@@ -211,11 +213,11 @@ public class ManagedLedgerWriter {
 
         log.info("Created {} managed ledgers", managedLedgers.size());
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                printAggregatedStats();
-            }
-        });
+        long start = System.nanoTime();
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
 
         Collections.shuffle(managedLedgers);
         AtomicBoolean isDone = new AtomicBoolean();
@@ -245,6 +247,8 @@ public class ManagedLedgerWriter {
                             long sendTime = (Long) (ctx);
                             messagesSent.increment();
                             bytesSent.add(payloadData.length);
+                            totalMessagesSent.increment();
+                            totalBytesSent.add(payloadData.length);
 
                             long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
                             recorder.recordValue(latencyMicros);
@@ -376,6 +380,17 @@ public class ManagedLedgerWriter {
         return map;
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+                "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s",
+                totalMessagesSent,
+                totalFormat.format(rate),
+                totalFormat.format(throughput));
+    }
+
     private static void printAggregatedStats() {
         Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
 
@@ -393,5 +408,6 @@ public class ManagedLedgerWriter {
 
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerWriter.class);
 }

[pulsar] 08/08: [Python] Do not sort schema fields by default (#12232)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ec4f909625036ea7bb67eac9fcda620418cd027
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 29 04:16:08 2021 -0600

    [Python] Do not sort schema fields by default (#12232)
    
    ### Motivation
    
    In Avro schema, the order of fields is used in the validation process, so if we are sorting the fields, that will generate an unexpected schema for a python producer/consumer and it will make it not interoperable with Java and other clients.
    
    (cherry picked from commit 2f3ad4d369e8a2ae558c6f9ee85f0b407e5e78b2)
---
 .../python/pulsar/schema/definition.py             | 19 ++++++++---
 pulsar-client-cpp/python/schema_test.py            | 38 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 9335176..fd778f3 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -60,6 +60,9 @@ class Record(with_metaclass(RecordMeta, object)):
     # This field is used to set namespace for Avro Record schema.
     _avro_namespace = None
 
+    # Generate a schema where fields are sorted alphabetically
+    _sorted_fields = False
+
     def __init__(self, default=None, required_default=False, required=False, *args, **kwargs):
         self._required_default = required_default
         self._default = default
@@ -114,20 +117,26 @@ class Record(with_metaclass(RecordMeta, object)):
 
         defined_names.add(namespace_name)
 
-        schema = {'name': str(cls.__name__)}
+        schema = {
+            'type': 'record',
+            'name': str(cls.__name__)
+        }
         if cls._avro_namespace is not None:
             schema['namespace'] = cls._avro_namespace
-        schema['type'] = 'record'
         schema['fields'] = []
 
-        for name in sorted(cls._fields.keys()):
+        if cls._sorted_fields:
+            fields = sorted(cls._fields.keys())
+        else:
+            fields = cls._fields.keys()
+        for name in fields:
             field = cls._fields[name]
             field_type = field.schema_info(defined_names) \
                 if field._required else ['null', field.schema_info(defined_names)]
             schema['fields'].append({
                 'name': name,
-                'type': field_type,
-                'default': field.default()
+                'default': field.default(),
+                'type': field_type
             }) if field.required_default() else schema['fields'].append({
                 'name': name,
                 'type': field_type,
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 40497ad..7adbcbe 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -38,6 +38,7 @@ class SchemaTest(TestCase):
             blue = 3
 
         class Example(Record):
+            _sorted_fields = True
             a = String()
             b = Integer()
             c = Array(String())
@@ -78,11 +79,13 @@ class SchemaTest(TestCase):
 
     def test_complex(self):
         class MySubRecord(Record):
+            _sorted_fields = True
             x = Integer()
             y = Long()
             z = String()
 
         class Example(Record):
+            _sorted_fields = True
             a = String()
             sub = MySubRecord     # Test with class
             sub2 = MySubRecord()  # Test with instance
@@ -348,6 +351,34 @@ class SchemaTest(TestCase):
         self.assertEqual(r2.__class__.__name__, 'Example')
         self.assertEqual(r2, r)
 
+    def test_non_sorted_fields(self):
+        class T1(Record):
+            a = Integer()
+            b = Integer()
+            c = Double()
+            d = String()
+
+        class T2(Record):
+            b = Integer()
+            a = Integer()
+            d = String()
+            c = Double()
+
+        self.assertNotEqual(T1.schema()['fields'], T2.schema()['fields'])
+
+    def test_sorted_fields(self):
+        class T1(Record):
+            _sorted_fields = True
+            a = Integer()
+            b = Integer()
+
+        class T2(Record):
+            _sorted_fields = True
+            b = Integer()
+            a = Integer()
+
+        self.assertEqual(T1.schema()['fields'], T2.schema()['fields'])
+
     def test_schema_version(self):
         class Example(Record):
             a = Integer()
@@ -691,6 +722,7 @@ class SchemaTest(TestCase):
 
     def test_avro_required_default(self):
         class MySubRecord(Record):
+            _sorted_fields = True
             x = Integer()
             y = Long()
             z = String()
@@ -707,7 +739,9 @@ class SchemaTest(TestCase):
             i = Map(String())
             j = MySubRecord()
 
+
         class ExampleRequiredDefault(Record):
+            _sorted_fields = True
             a = Integer(required_default=True)
             b = Boolean(required=True, required_default=True)
             c = Long(required_default=True)
@@ -879,10 +913,12 @@ class SchemaTest(TestCase):
 
     def test_serialize_schema_complex(self):
         class NestedObj1(Record):
+            _sorted_fields = True
             na1 = String()
             nb1 = Double()
 
         class NestedObj2(Record):
+            _sorted_fields = True
             na2 = Integer()
             nb2 = Boolean()
             nc2 = NestedObj1()
@@ -892,6 +928,7 @@ class SchemaTest(TestCase):
 
         class NestedObj4(Record):
             _avro_namespace = 'xxx4'
+            _sorted_fields = True
             na4 = String()
             nb4 = Integer()
 
@@ -902,6 +939,7 @@ class SchemaTest(TestCase):
 
         class ComplexRecord(Record):
             _avro_namespace = 'xxx.xxx'
+            _sorted_fields = True
             a = Integer()
             b = Integer()
             color = Color

[pulsar] 03/08: Remove the deprecated api usage in hdfs (#12080)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0d3d4218a158ba8d76d582e014d7291a824b05c
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun Sep 26 09:23:38 2021 +0800

    Remove the deprecated api usage in hdfs (#12080)
    
    ### Motivation
    Remove the deprecated api usage in hdfs
    
    ### Modifications
    Use try with resources instead of closeQuitely
    Remove the Long constructor
    
    * Remove the deprecated api usage in hdfs
    
    * Update pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
    
    Co-authored-by: Yunze Xu <xy...@163.com>
    
    Co-authored-by: Yunze Xu <xy...@163.com>
    (cherry picked from commit f8220166a1b9af09a35d926adde90955be225af1)
---
 .../java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java     | 7 +------
 .../apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java    | 2 +-
 .../java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java     | 7 +------
 .../apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java    | 2 +-
 4 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
index f1aab8b..456d0e5 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector {
         }
         InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
         SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
-        Socket socket = null;
-        try {
-            socket = socketFactory.createSocket();
+        try (Socket socket = socketFactory.createSocket()) {
             NetUtils.connect(socket, namenode, 1000); // 1 second timeout
-        } finally {
-            IOUtils.closeQuietly(socket);
         }
     }
 
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
index 88aee08..a3687b8 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
@@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S
 
     @Override
     public KeyValue<Long, String> extractKeyValue(Record<String> record) {
-       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+       Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet());
        return new KeyValue<>(sequence, record.getValue());
     }
 
diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
index 28e0bd4..dfe8833 100644
--- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector {
         }
         InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
         SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
-        Socket socket = null;
-        try {
-            socket = socketFactory.createSocket();
+        try (Socket socket = socketFactory.createSocket()){
             NetUtils.connect(socket, namenode, 1000); // 1 second timeout
-        } finally {
-            IOUtils.closeQuietly(socket);
         }
     }
 
diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
index cca5398..d63f360 100644
--- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
@@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S
 
     @Override
     public KeyValue<Long, String> extractKeyValue(Record<String> record) {
-       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+       Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet());
        return new KeyValue<>(sequence, record.getValue());
     }
 

[pulsar] 07/08: Disable stats recorder for built-in PulsarClient (#12217)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f2cc522411875273edabf112ff3c8d6f461b52a4
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Sep 28 22:14:08 2021 +0800

    Disable stats recorder for built-in PulsarClient (#12217)
    
    ### Motivation
    
    In `PulsarService`, there's a built-in client used in many places, like topic compaction, system topics read/write. It could also be used in protocol handlers. Different with the manually created Pulsar client, it can read the configured authentication params and TLS related configs for broker-to-broker communication.
    
    However, it doesn't change the default `statsIntervalSeconds` config, which means each time a producer/consumer/reader is created, a stats recorder (`ProducerStatsRecorderImpl` or `ConsumerStatsRecorderImpl`) will be created, in which there's a Netty `TimerTask` that prints stats periodically (the default interval is 1 minute). If there're many producers/consumers/readers created by this built-in client, the unnecessary CPU usage will be high.
    
    ### Modifications
    
    Configure the `statsIntervalSeconds` with zero value to disable stats recorder.
    
    (cherry picked from commit 52232e6e78e2ebf7a2d04489fd116c7f3abe7901)
---
 pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index eb01fa4..f3eae1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1370,6 +1370,8 @@ public class PulsarService implements AutoCloseable {
                             this.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             this.getConfiguration().getBrokerClientAuthenticationParameters()));
                 }
+
+                conf.setStatsIntervalSeconds(0);
                 this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
             } catch (Exception e) {
                 throw new PulsarServerException(e);

[pulsar] 06/08: Fix returned wrong hash ranges for the consumer with same consumer name (#12212)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9f39cc6fedaaf60f8bc3760dfcc44533b38d879f
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Sep 28 19:46:13 2021 +0800

    Fix returned wrong hash ranges for the consumer with same consumer name (#12212)
    
    Currently, we are using the consumer name to generate the hash ranges to the admin client.
    If there are consumers with the same name, we will get same hash ranges for different consumers,
    this will confuse when troubleshooting issue. The following is an example:
    
    ```
    "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "bytesOutCounter" : 46320,
            "msgOutCounter" : 1020,
            "msgRateRedeliver" : 0.0,
            "chunkedMessageRate" : 0.0,
            "consumerName" : "5253f",
            "availablePermits" : -20,
            "unackedMessages" : 1000,
            "avgMessagesPerEntry" : 56,
            "blockedConsumerOnUnackedMsgs" : false,
            "readPositionWhenJoining" : "10:11494",
            "lastAckedTimestamp" : 1632731049993,
            "lastConsumedTimestamp" : 1632731030268,
            "keyHashRanges" : [ "[0, 16384]" ],
            "metadata" : { },
            "address" : "/127.0.0.1:54702",
            "connectedSince" : "2021-09-27T16:23:49.891+08:00",
            "clientVersion" : "2.8.1"
          }, {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "bytesOutCounter" : 0,
            "msgOutCounter" : 0,
            "msgRateRedeliver" : 0.0,
            "chunkedMessageRate" : 0.0,
            "consumerName" : "my-name",
            "availablePermits" : 10,
            "unackedMessages" : 0,
            "avgMessagesPerEntry" : 1000,
            "blockedConsumerOnUnackedMsgs" : false,
            "readPositionWhenJoining" : "10:19505",
            "lastAckedTimestamp" : 0,
            "lastConsumedTimestamp" : 0,
            "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ],
            "metadata" : { },
            "address" : "/127.0.0.1:54708",
            "connectedSince" : "2021-09-27T16:23:59.031+08:00",
            "clientVersion" : "2.8.1"
          }, {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "bytesOutCounter" : 0,
            "msgOutCounter" : 0,
            "msgRateRedeliver" : 0.0,
            "chunkedMessageRate" : 0.0,
            "consumerName" : "my-name",
            "availablePermits" : 10,
            "unackedMessages" : 0,
            "avgMessagesPerEntry" : 1000,
            "blockedConsumerOnUnackedMsgs" : false,
            "readPositionWhenJoining" : "10:19514",
            "lastAckedTimestamp" : 0,
            "lastConsumedTimestamp" : 0,
            "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ],
            "metadata" : { },
            "address" : "/127.0.0.1:54717",
            "connectedSince" : "2021-09-27T16:24:03.927+08:00",
            "clientVersion" : "2.8.1"
          } ],
    ```
    
    The fix is to use the equals method of the consumer to generate the key hash ranges.
    New tests added.
    
    (cherry picked from commit 9abd6d30f39f74e255cd5dac40b77af3dce5c468)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java |  9 ++--
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |  9 ++--
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |  9 ++--
 .../broker/service/StickyKeyConsumerSelector.java  |  3 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  3 +-
 .../service/persistent/PersistentSubscription.java | 10 +++--
 ...istentHashingStickyKeyConsumerSelectorTest.java | 29 +++++++++----
 ...angeAutoSplitStickyKeyConsumerSelectorTest.java | 40 ++++++++++++++----
 ...angeExclusiveStickyKeyConsumerSelectorTest.java | 48 ++++++++++++++++++----
 .../java/org/apache/pulsar/client/api/Range.java   | 19 +++++++++
 10 files changed, 139 insertions(+), 40 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index f0f6431..7b7a830 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 /**
@@ -126,15 +127,15 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
     }
 
     @Override
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
-        Map<String, List<String>> result = new LinkedHashMap<>();
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
+        Map<Consumer, List<Range>> result = new LinkedHashMap<>();
         rwLock.readLock().lock();
         try {
             int start = 0;
             for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
                 for (Consumer consumer: entry.getValue()) {
-                    result.computeIfAbsent(consumer.consumerName(), key -> new ArrayList<>())
-                            .add("[" + start + ", " + entry.getKey() + "]");
+                    result.computeIfAbsent(consumer, key -> new ArrayList<>())
+                            .add(Range.of(start, entry.getKey()));
                 }
                 start = entry.getKey() + 1;
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 4f721c9..624108f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 
 /**
  * This is a consumer selector based fixed hash range.
@@ -112,12 +113,12 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
     }
 
     @Override
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
-        Map<String, List<String>> result = new HashMap<>();
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
+        Map<Consumer, List<Range>> result = new HashMap<>();
         int start = 0;
         for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
-            result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
-                    .add("[" + start + ", " + entry.getKey() + "]");
+            result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
+                    .add(Range.of(start, entry.getKey()));
             start = entry.getKey() + 1;
         }
         return result;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index f4909ae..7c49632 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 
@@ -65,16 +66,16 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
     }
 
     @Override
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
-        Map<String, List<String>> result = new HashMap<>();
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
+        Map<Consumer, List<Range>> result = new HashMap<>();
         Map.Entry<Integer, Consumer> prev = null;
         for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
             if (prev == null) {
                 prev = entry;
             } else {
                 if (prev.getValue().equals(entry.getValue())) {
-                    result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
-                            .add("[" + prev.getKey() + ", " + entry.getKey() + "]");
+                    result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
+                            .add(Range.of(prev.getKey(), entry.getKey()));
                 }
                 prev = null;
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 83952fc..b129ef5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import java.util.List;
 import java.util.Map;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 public interface StickyKeyConsumerSelector {
@@ -66,5 +67,5 @@ public interface StickyKeyConsumerSelector {
      * Get key hash ranges handled by each consumer.
      * @return A map where key is a consumer name and value is list of hash range it receiving message for.
      */
-    Map<String, List<String>> getConsumerKeyHashRanges();
+    Map<Consumer, List<Range>> getConsumerKeyHashRanges();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index e62f63e..9a036f4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelec
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -418,7 +419,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return recentlyJoinedConsumers;
     }
 
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
         return selector.getConsumerKeyHashRanges();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 1eb99e9..b127b1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -1019,7 +1021,7 @@ public class PersistentSubscription implements Subscription {
         subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
         Dispatcher dispatcher = this.dispatcher;
         if (dispatcher != null) {
-            Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared
+            Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == SubType.Key_Shared
                     ? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null;
             dispatcher.getConsumers().forEach(consumer -> {
                 ConsumerStatsImpl consumerStats = consumer.getStats();
@@ -1034,8 +1036,10 @@ public class PersistentSubscription implements Subscription {
                 subStats.lastConsumedTimestamp =
                         Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
                 subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
-                if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer.consumerName())) {
-                    consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer.consumerName());
+                if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) {
+                    consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream()
+                            .map(Range::toString)
+                            .collect(Collectors.toList());
                 }
             });
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 29b40c2..e058492 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -24,9 +24,11 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -146,22 +148,31 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest {
     public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
         ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
+        List<Consumer> consumers = new ArrayList<>();
         for (String s : consumerName) {
             Consumer consumer = mock(Consumer.class);
             when(consumer.consumerName()).thenReturn(s);
             selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
-
-        Map<String, Set<String>> expectedResult = new HashMap<>();
-        expectedResult.put("consumer1", ImmutableSet.of("[0, 330121749]", "[330121750, 618146114]", "[1797637922, 1976098885]"));
-        expectedResult.put("consumer2", ImmutableSet.of("[938427576, 1094135919]", "[1138613629, 1342907082]", "[1342907083, 1797637921]"));
-        expectedResult.put("consumer3", ImmutableSet.of("[618146115, 772640562]", "[772640563, 938427575]", "[1094135920, 1138613628]"));
-        for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
-            Assert.assertEquals(new HashSet<>(entry.getValue()), expectedResult.get(entry.getKey()));
+        Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        expectedResult.put(consumers.get(0), Arrays.asList(
+                Range.of(0, 330121749),
+                Range.of(330121750, 618146114),
+                Range.of(1797637922, 1976098885)));
+        expectedResult.put(consumers.get(1), Arrays.asList(
+                Range.of(938427576, 1094135919),
+                Range.of(1138613629, 1342907082),
+                Range.of(1342907083, 1797637921)));
+        expectedResult.put(consumers.get(2), Arrays.asList(
+                Range.of(618146115, 772640562),
+                Range.of(772640563, 938427575),
+                Range.of(1094135920, 1138613628)));
+        for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
+            System.out.println(entry.getValue());
+            Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
             expectedResult.remove(entry.getKey());
         }
         Assert.assertEquals(expectedResult.size(), 0);
     }
-
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
index cb4b809..f17bb5a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
-import com.google.common.collect.ImmutableList;
+import org.apache.pulsar.client.api.Range;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,22 +39,46 @@ public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {
     public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
         HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
+        List<Consumer> consumers = new ArrayList<>();
         for (String s : consumerName) {
             Consumer consumer = mock(Consumer.class);
             when(consumer.consumerName()).thenReturn(s);
             selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
 
-        Map<String, List<String>> expectedResult = new HashMap<>();
-        expectedResult.put("consumer1", ImmutableList.of("[49, 64]"));
-        expectedResult.put("consumer4", ImmutableList.of("[33, 48]"));
-        expectedResult.put("consumer2", ImmutableList.of("[17, 32]"));
-        expectedResult.put("consumer3", ImmutableList.of("[0, 16]"));
-        for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
+        Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(49, 64)));
+        expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(33, 48)));
+        expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(17, 32)));
+        expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(0, 16)));
+        for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
             Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
             expectedResult.remove(entry.getKey());
         }
         Assert.assertEquals(expectedResult.size(), 0);
     }
 
+    @Test
+    public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
+        HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
+        final String consumerName = "My-consumer";
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            Consumer consumer = mock(Consumer.class);
+            when(consumer.consumerName()).thenReturn(consumerName);
+            selector.addConsumer(consumer);
+            consumers.add(consumer);
+        }
+
+        List<Range> prev = null;
+        for (Consumer consumer : consumers) {
+            List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer);
+            Assert.assertEquals(ranges.size(), 1);
+            if (prev != null) {
+                Assert.assertNotEquals(prev, ranges);
+            }
+            prev = ranges;
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
index c63e57e..dde0a58 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
@@ -21,15 +21,16 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -117,6 +118,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
         HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
         List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20});
+        List<Consumer> consumers = new ArrayList<>();
         for (int index = 0; index < consumerName.size(); index++) {
             Consumer consumer = mock(Consumer.class);
             KeySharedMeta keySharedMeta = new KeySharedMeta()
@@ -128,14 +130,15 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
             when(consumer.consumerName()).thenReturn(consumerName.get(index));
             Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
             selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
 
-        Map<String, List<String>> expectedResult = new HashMap<>();
-        expectedResult.put("consumer1", ImmutableList.of("[0, 2]"));
-        expectedResult.put("consumer2", ImmutableList.of("[3, 7]"));
-        expectedResult.put("consumer3", ImmutableList.of("[9, 12]"));
-        expectedResult.put("consumer4", ImmutableList.of("[15, 20]"));
-        for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
+        Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(0, 2)));
+        expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(3, 7)));
+        expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(9, 12)));
+        expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(15, 20)));
+        for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
             Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
             expectedResult.remove(entry.getKey());
         }
@@ -143,6 +146,37 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
     }
 
     @Test
+    public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
+        HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
+        final String consumerName = "My-consumer";
+        List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12});
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            Consumer consumer = mock(Consumer.class);
+            KeySharedMeta keySharedMeta = new KeySharedMeta()
+                    .setKeySharedMode(KeySharedMode.STICKY);
+            keySharedMeta.addHashRange()
+                    .setStart(range.get(i)[0])
+                    .setEnd(range.get(i)[1]);
+            when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
+            when(consumer.consumerName()).thenReturn(consumerName);
+            Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
+            selector.addConsumer(consumer);
+            consumers.add(consumer);
+        }
+
+        List<Range> prev = null;
+        for (Consumer consumer : consumers) {
+            List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer);
+            Assert.assertEquals(ranges.size(), 1);
+            if (prev != null) {
+                Assert.assertNotEquals(prev, ranges);
+            }
+            prev = ranges;
+        }
+    }
+
+    @Test
     public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssignException {
         HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
         Consumer consumer1 = mock(Consumer.class);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
index 57a408b..75b1cbc 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.client.api;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
+import java.util.Objects;
+
 /**
  * Int range.
  */
@@ -63,6 +65,23 @@ public class Range {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Range range = (Range) o;
+        return start == range.start && end == range.end;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(start, end);
+    }
+
+    @Override
     public String toString() {
         return "[" + start + ", " + end + "]";
     }

[pulsar] 05/08: [testclient] Add total messages when printing throughput (#12084)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2fe6cba648175fdf8ca26a2c4d986089bd2922a3
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Tue Sep 28 15:44:51 2021 +0800

    [testclient] Add total messages when printing throughput (#12084)
    
    ### Motivation
    Add total read/produce messages when periodic printing throughput in `PerformanceReader` / `ManagedLedgerWriter`&`PerformanceClient`
    
    ### Modifications
    - Get `totalMessages` then add it to the log information
    
    (cherry picked from commit 91697c5670f3a156ebdc55523de47baea11f5c52)
---
 .../org/apache/pulsar/proxy/socket/client/PerformanceClient.java  | 8 ++++++--
 .../java/org/apache/pulsar/testclient/ManagedLedgerWriter.java    | 8 ++++++--
 .../main/java/org/apache/pulsar/testclient/PerformanceReader.java | 6 +++++-
 3 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 0e6f84b..c6aa5b8 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -301,14 +301,17 @@ public class PerformanceClient {
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
 
+            long total = totalMessagesSent.sum();
             double rate = messagesSent.sumThenReset() / elapsed;
             double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;
 
             reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram(reportHistogram);
 
             log.info(
-                    "Throughput produced: {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms",
-                    throughputFormat.format(rate), throughputFormat.format(throughput),
+                    "Throughput produced: {} msg --- {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms",
+                    intFormat.format(total),
+                    throughputFormat.format(rate),
+                    throughputFormat.format(throughput),
                     dec.format(reportHistogram.getMean() / 1000.0),
                     dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
                     dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
@@ -384,6 +387,7 @@ public class PerformanceClient {
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
     static final DecimalFormat totalFormat = new DecimalFormat("0.000");
+    static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     private static final Logger log = LoggerFactory.getLogger(PerformanceClient.class);
 
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 244be81..47de256 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -320,14 +320,17 @@ public class ManagedLedgerWriter {
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
 
+            long total = totalMessagesSent.sum();
             double rate = messagesSent.sumThenReset() / elapsed;
             double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;
 
             reportHistogram = recorder.getIntervalHistogram(reportHistogram);
 
             log.info(
-                    "Throughput produced: {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
-                    throughputFormat.format(rate), throughputFormat.format(throughput),
+                    "Throughput produced: {} msg --- {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    intFormat.format(total),
+                    throughputFormat.format(rate),
+                    throughputFormat.format(throughput),
                     dec.format(reportHistogram.getMean() / 1000.0),
                     dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
                     dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
@@ -409,5 +412,6 @@ public class ManagedLedgerWriter {
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
     static final DecimalFormat totalFormat = new DecimalFormat("0.000");
+    static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerWriter.class);
 }
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index c4becf0..778d51d 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -46,12 +46,14 @@ import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PerformanceReader {
     private static final LongAdder messagesReceived = new LongAdder();
     private static final LongAdder bytesReceived = new LongAdder();
+    private static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     private static final DecimalFormat dec = new DecimalFormat("0.000");
 
     private static final LongAdder totalMessagesReceived = new LongAdder();
@@ -303,12 +305,14 @@ public class PerformanceReader {
 
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
+            long total = totalMessagesReceived.sum();
             double rate = messagesReceived.sumThenReset() / elapsed;
             double throughput = bytesReceived.sumThenReset() / elapsed * 8 / 1024 / 1024;
 
             reportHistogram = recorder.getIntervalHistogram(reportHistogram);
             log.info(
-                    "Read throughput: {}  msg/s -- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    "Read throughput: {} msg --- {}  msg/s -- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    intFormat.format(total),
                     dec.format(rate), dec.format(throughput), dec.format(reportHistogram.getMean()),
                     reportHistogram.getValueAtPercentile(50), reportHistogram.getValueAtPercentile(95),
                     reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),

[pulsar] 01/08: [pulsar-client] Make it possible to disable poolMessages (#12108)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cd2dd0618b47bec3c34478d634f66e7293d77a32
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Fri Sep 24 18:29:59 2021 +0800

    [pulsar-client] Make it possible to disable poolMessages (#12108)
    
    
    (cherry picked from commit 4aae519c01e48439fc074ef6e8a799272a411344)
---
 .../src/main/java/org/apache/pulsar/client/cli/CmdConsume.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index b614be0..6264ed6 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -127,7 +127,7 @@ public class CmdConsume {
     @Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
     private String schematype = "bytes";
 
-    @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+    @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message", arity = 1)
     private boolean poolMessages = true;
 
     private ClientBuilder clientBuilder;