You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/06 06:49:45 UTC

[pulsar] branch branch-2.6 updated: Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 68793d5  Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)
68793d5 is described below

commit 68793d506742d6e6312f774d10d440912d4acb01
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Jan 4 11:25:14 2021 -0600

    Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)
    
    [#6379](https://github.com/apache/pulsar/pull/6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.
    
    - Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`.
    - Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only.
    
    (cherry picked from commit dd3b9d8115ccaa409957a089ab04ea16b731e5a2)
---
 .../pulsar/broker/service/NullValueTest.java       | 52 +++++++++++++++++-----
 .../apache/pulsar/client/impl/ConsumerBase.java    | 11 ++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 12 ++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 14 +++---
 4 files changed, 63 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index 855bda4..5ac8375 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -21,17 +21,22 @@ package org.apache.pulsar.broker.service;
 import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -52,13 +57,23 @@ public class NullValueTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void nullValueBytesSchemaTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/null-value-bytes-test";
+    @DataProvider(name = "topics")
+    public static Object[][] topics() {
+        return new Object[][]{
+                {"persistent://prop/ns-abc/null-value-test-0", 1},
+                {"persistent://prop/ns-abc/null-value-test-1", 3},
+        };
+    }
+
+    @Test(dataProvider = "topics")
+    public void nullValueBytesSchemaTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer producer = pulsarClient.newProducer()
                 .topic(topic)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
@@ -120,13 +135,15 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void nullValueBooleanSchemaTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/null-value-bool-test";
+    @Test(dataProvider = "topics")
+    public void nullValueBooleanSchemaTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
                 .topic(topic)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
@@ -148,14 +165,16 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void keyValueNullInlineTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/kv-null-value-test";
+    @Test(dataProvider = "topics")
+    public void keyValueNullInlineTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer<KeyValue<String, String>> producer = pulsarClient
                 .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
                 .topic(topic)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
@@ -193,14 +212,23 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void keyValueNullSeparatedTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/kv-null-value-test";
+    @Test(dataProvider = "topics")
+    public void keyValueNullSeparatedTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer<KeyValue<String, String>> producer = pulsarClient
                 .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
                 .topic(topic)
+                // The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
+                // SEPARATED so we need to define a message router to guarantee the message order.
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+                        return 0;
+                    }
+                })
                 .create();
 
         @Cleanup
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 26139ec..da5302c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -77,7 +77,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
     protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
-    protected static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
+    private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConsumerBase.class, "incomingMessagesSize");
     protected volatile long incomingMessagesSize = 0;
     protected volatile Timeout batchReceiveTimeout = null;
@@ -813,6 +813,15 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return pendingBatchReceives != null && peekNextBatchReceive() != null;
     }
 
+    protected void resetIncomingMessageSize() {
+        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+    }
+
+    protected void updateIncomingMessageSize(final Message<?> message) {
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
+                (message.getData() != null) ? message.getData().length : 0);
+    }
+
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 49dff72..04c0427 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -868,7 +868,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private BatchMessageIdImpl clearReceiverQueue() {
         List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
-        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+        resetIncomingMessageSize();
 
         if (duringSeek.compareAndSet(true, false)) {
             return seekMessageId;
@@ -1438,7 +1438,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         stats.updateNumMsgsReceived(msg);
 
         trackMessage(msg);
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
+        updateIncomingMessageSize(msg);
     }
 
     protected void trackMessage(Message<?> msg) {
@@ -1644,7 +1644,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             synchronized (this) {
                 currentSize = incomingMessages.size();
                 incomingMessages.clear();
-                INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+                resetIncomingMessageSize();
                 unAckedMessageTracker.clear();
             }
             cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
@@ -1814,7 +1814,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             lastDequeuedMessageId = MessageId.earliest;
 
             incomingMessages.clear();
-            INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+            resetIncomingMessageSize();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -1875,7 +1875,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             lastDequeuedMessageId = MessageId.earliest;
 
             incomingMessages.clear();
-            INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+            resetIncomingMessageSize();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -2115,7 +2115,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             // try not to remove elements that are added while we remove
             Message<T> message = incomingMessages.poll();
             while (message != null) {
-                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+                updateIncomingMessageSize(message);
                 messagesFromQueue++;
                 MessageIdImpl id = getMessageIdImpl(message);
                 if (!messageIds.contains(id)) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 21209c6..c602772 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -285,7 +285,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     protected synchronized void messageProcessed(Message<?> msg) {
         unAckedMessageTracker.add(msg.getMessageId());
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+        updateIncomingMessageSize(msg);
     }
 
     private void resumeReceivingFromPausedConsumersIfNeeded() {
@@ -308,7 +308,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         Message<T> message;
         try {
             message = incomingMessages.take();
-            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+            updateIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -324,7 +324,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             message = incomingMessages.poll(timeout, unit);
             if (message != null) {
-                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+                updateIncomingMessageSize(message);
                 checkArgument(message instanceof TopicMessageImpl);
                 unAckedMessageTracker.add(message.getMessageId());
             }
@@ -365,7 +365,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 while (msgPeeked != null && messages.canAdd(msgPeeked)) {
                     Message<T> msg = incomingMessages.poll();
                     if (msg != null) {
-                        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+                        updateIncomingMessageSize(msg);
                         Message<T> interceptMsg = beforeConsume(msg);
                         messages.add(interceptMsg);
                     }
@@ -393,7 +393,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             pendingReceives.add(result);
             cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
         } else {
-            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+            updateIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -571,7 +571,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 consumer.unAckedChunckedMessageIdSequenceMap.clear();
             });
             incomingMessages.clear();
-            INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+            resetIncomingMessageSize();
             unAckedMessageTracker.clear();
         } finally {
             lock.writeLock().unlock();
@@ -679,7 +679,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             Message<T> message = incomingMessages.poll();
             checkState(message instanceof TopicMessageImpl);
             while (message != null) {
-                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+                updateIncomingMessageSize(message);
                 MessageId messageId = message.getMessageId();
                 if (!messageIds.contains(messageId)) {
                     messageIds.add(messageId);