You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/06 06:49:45 UTC
[pulsar] branch branch-2.6 updated: Fix NPE when
MultiTopicsConsumerImpl receives null value messages (#9113)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 68793d5 Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)
68793d5 is described below
commit 68793d506742d6e6312f774d10d440912d4acb01
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Jan 4 11:25:14 2021 -0600
Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)
[#6379](https://github.com/apache/pulsar/pull/6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.
- Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`.
- Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only.
(cherry picked from commit dd3b9d8115ccaa409957a089ab04ea16b731e5a2)
---
.../pulsar/broker/service/NullValueTest.java | 52 +++++++++++++++++-----
.../apache/pulsar/client/impl/ConsumerBase.java | 11 ++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 12 ++---
.../client/impl/MultiTopicsConsumerImpl.java | 14 +++---
4 files changed, 63 insertions(+), 26 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index 855bda4..5ac8375 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -21,17 +21,22 @@ package org.apache.pulsar.broker.service;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -52,13 +57,23 @@ public class NullValueTest extends BrokerTestBase {
super.internalCleanup();
}
- @Test
- public void nullValueBytesSchemaTest() throws PulsarClientException {
- String topic = "persistent://prop/ns-abc/null-value-bytes-test";
+ @DataProvider(name = "topics")
+ public static Object[][] topics() {
+ return new Object[][]{
+ {"persistent://prop/ns-abc/null-value-test-0", 1},
+ {"persistent://prop/ns-abc/null-value-test-1", 3},
+ };
+ }
+
+ @Test(dataProvider = "topics")
+ public void nullValueBytesSchemaTest(String topic, int partitions)
+ throws PulsarClientException, PulsarAdminException {
+ admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
Producer producer = pulsarClient.newProducer()
.topic(topic)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@Cleanup
@@ -120,13 +135,15 @@ public class NullValueTest extends BrokerTestBase {
}
- @Test
- public void nullValueBooleanSchemaTest() throws PulsarClientException {
- String topic = "persistent://prop/ns-abc/null-value-bool-test";
+ @Test(dataProvider = "topics")
+ public void nullValueBooleanSchemaTest(String topic, int partitions)
+ throws PulsarClientException, PulsarAdminException {
+ admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
.topic(topic)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@Cleanup
@@ -148,14 +165,16 @@ public class NullValueTest extends BrokerTestBase {
}
- @Test
- public void keyValueNullInlineTest() throws PulsarClientException {
- String topic = "persistent://prop/ns-abc/kv-null-value-test";
+ @Test(dataProvider = "topics")
+ public void keyValueNullInlineTest(String topic, int partitions)
+ throws PulsarClientException, PulsarAdminException {
+ admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@Cleanup
@@ -193,14 +212,23 @@ public class NullValueTest extends BrokerTestBase {
}
- @Test
- public void keyValueNullSeparatedTest() throws PulsarClientException {
- String topic = "persistent://prop/ns-abc/kv-null-value-test";
+ @Test(dataProvider = "topics")
+ public void keyValueNullSeparatedTest(String topic, int partitions)
+ throws PulsarClientException, PulsarAdminException {
+ admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
+ // The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
+ // SEPARATED so we need to define a message router to guarantee the message order.
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+ return 0;
+ }
+ })
.create();
@Cleanup
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 26139ec..da5302c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -77,7 +77,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ConsumerInterceptors<T> interceptors;
protected final BatchReceivePolicy batchReceivePolicy;
protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
- protected static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
+ private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConsumerBase.class, "incomingMessagesSize");
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
@@ -813,6 +813,15 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return pendingBatchReceives != null && peekNextBatchReceive() != null;
}
+ protected void resetIncomingMessageSize() {
+ INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ }
+
+ protected void updateIncomingMessageSize(final Message<?> message) {
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
+ (message.getData() != null) ? message.getData().length : 0);
+ }
+
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 49dff72..04c0427 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -868,7 +868,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private BatchMessageIdImpl clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
- INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ resetIncomingMessageSize();
if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
@@ -1438,7 +1438,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
stats.updateNumMsgsReceived(msg);
trackMessage(msg);
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
+ updateIncomingMessageSize(msg);
}
protected void trackMessage(Message<?> msg) {
@@ -1644,7 +1644,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
synchronized (this) {
currentSize = incomingMessages.size();
incomingMessages.clear();
- INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ resetIncomingMessageSize();
unAckedMessageTracker.clear();
}
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
@@ -1814,7 +1814,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
lastDequeuedMessageId = MessageId.earliest;
incomingMessages.clear();
- INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ resetIncomingMessageSize();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -1875,7 +1875,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
lastDequeuedMessageId = MessageId.earliest;
incomingMessages.clear();
- INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ resetIncomingMessageSize();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -2115,7 +2115,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+ updateIncomingMessageSize(message);
messagesFromQueue++;
MessageIdImpl id = getMessageIdImpl(message);
if (!messageIds.contains(id)) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 21209c6..c602772 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -285,7 +285,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+ updateIncomingMessageSize(msg);
}
private void resumeReceivingFromPausedConsumersIfNeeded() {
@@ -308,7 +308,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
Message<T> message;
try {
message = incomingMessages.take();
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+ updateIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
@@ -324,7 +324,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+ updateIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
}
@@ -365,7 +365,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+ updateIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
@@ -393,7 +393,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+ updateIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
@@ -571,7 +571,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
consumer.unAckedChunckedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
- INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ resetIncomingMessageSize();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
@@ -679,7 +679,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
Message<T> message = incomingMessages.poll();
checkState(message instanceof TopicMessageImpl);
while (message != null) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+ updateIncomingMessageSize(message);
MessageId messageId = message.getMessageId();
if (!messageIds.contains(messageId)) {
messageIds.add(messageId);