You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/12 03:26:58 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #9182: Fix incoming message size issue that introduced in #9113

codelipenghui opened a new pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182


   ### Motivation
   
   Fix incoming message size issue that introduced in #9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With #9113, will always increase the incoming queue size.
   
   ### Modifications
   
   Add method `increaseIncomingSize` and `decreaseIncomingSize`
   
   ### Verifying this change
   
   Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#discussion_r555512060



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -847,13 +847,23 @@ protected boolean hasPendingBatchReceive() {
         return pendingBatchReceives != null && peekNextBatchReceive() != null;
     }
 
+    protected void increaseIncomingMessageSize(final Message<?> message) {
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
+                this, message.getData() == null ? 0 : message.getData().length);
+    }
+
     protected void resetIncomingMessageSize() {
         INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
     }
 
-    protected void updateIncomingMessageSize(final Message<?> message) {
+    protected void decreaseIncomingMessageSize(final Message<?> message) {
         INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
-                (message.getData() != null) ? message.getData().length : 0);
+                (message.getData() != null) ? -message.getData().length : 0);
+    }
+
+    @VisibleForTesting

Review comment:
       Public methods should not need this annotation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#discussion_r555501885



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3692,4 +3693,88 @@ public void testGetStatsForPartitionedTopic() throws Exception {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testIncomingMessageSizeForNonPartitionedTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSizeForNonPartitionedTopic-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().value(("Message-" + i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should greater that 0, current size is {}", size);
+            Assert.assertTrue(size > 0);
+        });
+
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+        }
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should be 0, current size is {}", size);
+            Assert.assertEquals(size, 0);
+        });
+    }
+
+    @Test
+    public void testIncomingMessageSizeForPartitionedTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSizeForPartitionedTopic-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        admin.topics().createPartitionedTopic(topicName, 3);

Review comment:
       It's the only difference between `testIncomingMessageSizeForPartitionedTopic` and `testIncomingMessageSizeForNonPartitionedTopic`, IMO it's better to use `DataProvider` to reduce repeated logic, see `NullValueTest` for example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#discussion_r555510986



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3692,4 +3693,88 @@ public void testGetStatsForPartitionedTopic() throws Exception {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testIncomingMessageSizeForNonPartitionedTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSizeForNonPartitionedTopic-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().value(("Message-" + i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();

Review comment:
       nitpick: in general, it's good to use timeouts in tests, for example using `.get(3, TimeUnit.SECONDS)`. However, I guess it's not very likely that sending would fail at this point, so the timeout isn't that relevant in this case. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#issuecomment-758371013


   @BewareMyPower @lhotari Please help review this PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#discussion_r555550495



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3692,4 +3693,88 @@ public void testGetStatsForPartitionedTopic() throws Exception {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testIncomingMessageSizeForNonPartitionedTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSizeForNonPartitionedTopic-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().value(("Message-" + i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();

Review comment:
       Yes. The TestNG has global timeout settings.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#issuecomment-758371769


   OK, I'll take a look soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#issuecomment-758439243


   The import of VisibleForTesting has not been deleted, I don’t know if the style check will report an error


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#discussion_r555514953



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3692,4 +3693,88 @@ public void testGetStatsForPartitionedTopic() throws Exception {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testIncomingMessageSizeForNonPartitionedTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSizeForNonPartitionedTopic-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().value(("Message-" + i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();

Review comment:
       I don't think using timeout is a good idea. We should rely on the test framework timeout not the timeouts within the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#discussion_r555519499



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3692,4 +3693,88 @@ public void testGetStatsForPartitionedTopic() throws Exception {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testIncomingMessageSizeForNonPartitionedTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSizeForNonPartitionedTopic-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().value(("Message-" + i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();

Review comment:
       @sijie yes, I agree that placing timeouts all over could clutter the test code. 
   Do you happen to know if there's already a default timeout set for each individual test method?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9182: Fix incoming message size issue that introduced in #9113

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9182:
URL: https://github.com/apache/pulsar/pull/9182#issuecomment-758416703


   @BewareMyPower @315157973 I have addressed your comments, please take a look again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org