You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/13 01:55:17 UTC

[pulsar] branch master updated: Fix incoming message size issue that introduced in #9113 (#9182)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cee0a8  Fix incoming message size issue that introduced in #9113 (#9182)
2cee0a8 is described below

commit 2cee0a8395c5deecf5584e30ac55c370161f2b45
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jan 13 09:54:49 2021 +0800

    Fix incoming message size issue that introduced in #9113 (#9182)
    
    ### Motivation
    
    Fix incoming message size issue that introduced in #9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With #9113, will always increase the incoming queue size.
    
    ### Modifications
    
    Add method `increaseIncomingSize` and `decreaseIncomingSize`
    
    ### Verifying this change
    
    Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.
---
 .../client/api/SimpleProducerConsumerTest.java     | 51 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 19 +++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  4 +-
 .../client/impl/MultiTopicsConsumerImpl.java       | 12 ++---
 4 files changed, 73 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 6354e60..9fc14c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -77,6 +77,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -3692,4 +3693,54 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    @DataProvider(name = "partitioned")
+    public static Object[] isPartitioned() {
+        return new Object[] {false, true};
+    }
+
+    @Test(dataProvider = "partitioned")
+    public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        if (isPartitioned) {
+            admin.topics().createPartitionedTopic(topicName, 3);
+        }
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should greater that 0, current size is {}", size);
+            Assert.assertTrue(size > 0);
+        });
+
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+        }
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should be 0, current size is {}", size);
+            Assert.assertEquals(size, 0);
+        });
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index e258704..b100a7a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import java.util.Collections;
 import java.util.List;
@@ -664,8 +665,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
-            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
-                    this, message.getData() == null ? 0 : message.getData().length);
+            increaseIncomingMessageSize(message);
         }
         return hasEnoughMessagesForBatchReceive();
     }
@@ -675,7 +675,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             return false;
         }
         return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
-                || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes());
+                || (batchReceivePolicy.getMaxNumBytes() > 0 && getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes());
     }
 
     private void verifyConsumerState() throws PulsarClientException {
@@ -847,13 +847,22 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return pendingBatchReceives != null && peekNextBatchReceive() != null;
     }
 
+    protected void increaseIncomingMessageSize(final Message<?> message) {
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
+                this, message.getData() == null ? 0 : message.getData().length);
+    }
+
     protected void resetIncomingMessageSize() {
         INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
     }
 
-    protected void updateIncomingMessageSize(final Message<?> message) {
+    protected void decreaseIncomingMessageSize(final Message<?> message) {
         INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
-                (message.getData() != null) ? message.getData().length : 0);
+                (message.getData() != null) ? -message.getData().length : 0);
+    }
+
+    public long getIncomingMessageSize() {
+        return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
     }
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 74c8eea..e9d7ad2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1507,7 +1507,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         stats.updateNumMsgsReceived(msg);
 
         trackMessage(msg);
-        updateIncomingMessageSize(msg);
+        decreaseIncomingMessageSize(msg);
     }
 
     protected void trackMessage(Message<?> msg) {
@@ -2197,7 +2197,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             // try not to remove elements that are added while we remove
             Message<T> message = incomingMessages.poll();
             while (message != null) {
-                updateIncomingMessageSize(message);
+                decreaseIncomingMessageSize(message);
                 messagesFromQueue++;
                 MessageIdImpl id = getMessageIdImpl(message);
                 if (!messageIds.contains(id)) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ef5d1f2..8910393 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -312,7 +312,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     protected synchronized void messageProcessed(Message<?> msg) {
         unAckedMessageTracker.add(msg.getMessageId());
-        updateIncomingMessageSize(msg);
+        decreaseIncomingMessageSize(msg);
     }
 
     private void resumeReceivingFromPausedConsumersIfNeeded() {
@@ -335,7 +335,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         Message<T> message;
         try {
             message = incomingMessages.take();
-            updateIncomingMessageSize(message);
+            decreaseIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -351,7 +351,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             message = incomingMessages.poll(timeout, unit);
             if (message != null) {
-                updateIncomingMessageSize(message);
+                decreaseIncomingMessageSize(message);
                 checkArgument(message instanceof TopicMessageImpl);
                 unAckedMessageTracker.add(message.getMessageId());
             }
@@ -392,7 +392,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 while (msgPeeked != null && messages.canAdd(msgPeeked)) {
                     Message<T> msg = incomingMessages.poll();
                     if (msg != null) {
-                        updateIncomingMessageSize(msg);
+                        decreaseIncomingMessageSize(msg);
                         Message<T> interceptMsg = beforeConsume(msg);
                         messages.add(interceptMsg);
                     }
@@ -420,7 +420,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             pendingReceives.add(result);
             cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
         } else {
-            updateIncomingMessageSize(message);
+            decreaseIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -785,7 +785,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             Message<T> message = incomingMessages.poll();
             checkState(message instanceof TopicMessageImpl);
             while (message != null) {
-                updateIncomingMessageSize(message);
+                decreaseIncomingMessageSize(message);
                 MessageId messageId = message.getMessageId();
                 if (!messageIds.contains(messageId)) {
                     messageIds.add(messageId);