You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:37:59 UTC

[pulsar] branch branch-2.7 updated (4ed43c9 -> 9b5a4b2)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 4ed43c9  Monitor if a cursor moves its mark-delete position (#8930)
     new a327829  Fix regression in apply-config-from-env.py (#9097)
     new 44a70d9  [cpp-client] Fix compilation issue caused by non-virtual destructor in consumer. (#9105) (#9106)
     new 9b5a4b2  Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/pulsar/scripts/apply-config-from-env.py     |  8 ----
 .../pulsar/broker/service/NullValueTest.java       | 52 +++++++++++++++++-----
 pulsar-client-cpp/include/pulsar/Consumer.h        |  1 +
 .../apache/pulsar/client/impl/ConsumerBase.java    | 11 ++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 14 +++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 16 +++----
 6 files changed, 66 insertions(+), 36 deletions(-)


[pulsar] 01/03: Fix regression in apply-config-from-env.py (#9097)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a32782944e1e7f7da8f4bc1cb2483cb137259bd2
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jan 4 08:54:35 2021 +0800

    Fix regression in apply-config-from-env.py (#9097)
    
    ### Motivation
    
    Fix the regression that introduced in #8709
    
    In #8709, if values contain spaces, the value will be wrapped as "value", this will introduce break changes while users already have some configs with the value that contains spaces, so this PR is reverting this change.
    
    If users want to ensure some values are processed as a group, they should use `export key=\"value\"` instead of implicitly adding `""` when encountering spaces
    
    (cherry picked from commit 4ad499d56307d52f616e4dbfd0bbd3693c26d14b)
---
 docker/pulsar/scripts/apply-config-from-env.py | 8 --------
 1 file changed, 8 deletions(-)

diff --git a/docker/pulsar/scripts/apply-config-from-env.py b/docker/pulsar/scripts/apply-config-from-env.py
index 948254b..992b8f7 100755
--- a/docker/pulsar/scripts/apply-config-from-env.py
+++ b/docker/pulsar/scripts/apply-config-from-env.py
@@ -57,10 +57,6 @@ for conf_filename in conf_files:
     for k in sorted(os.environ.keys()):
         v = os.environ[k].strip()
 
-        # Quote the value if it contains a space.
-        if v.find(" ") >= 0:
-            v = '\"%s\"' % v
-
         # Hide the value in logs if is password.
         if "password" in k:
             displayValue = "********"
@@ -81,10 +77,6 @@ for conf_filename in conf_files:
         if not k.startswith(PF_ENV_PREFIX):
             continue
 
-        # Quote the value if it contains a space.
-        if v.find(" ") >= 0:
-            v = '\"%s\"' % v
-
         # Hide the value in logs if is password.
         if "password" in k:
             displayValue = "********"


[pulsar] 03/03: Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9b5a4b22ca61dd4c2418c93145c1e4f0eed86255
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Jan 4 11:25:14 2021 -0600

    Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113)
    
    ### Motivation
    
    [#6379](https://github.com/apache/pulsar/pull/6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.
    
    ### Modifications
    
    - Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`.
    - Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only.
    
    (cherry picked from commit dd3b9d8115ccaa409957a089ab04ea16b731e5a2)
---
 .../pulsar/broker/service/NullValueTest.java       | 52 +++++++++++++++++-----
 .../apache/pulsar/client/impl/ConsumerBase.java    | 11 ++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 14 +++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 16 +++----
 4 files changed, 65 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index 553fe01..6473b2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -21,17 +21,22 @@ package org.apache.pulsar.broker.service;
 import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -52,13 +57,23 @@ public class NullValueTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void nullValueBytesSchemaTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/null-value-bytes-test";
+    @DataProvider(name = "topics")
+    public static Object[][] topics() {
+        return new Object[][]{
+                {"persistent://prop/ns-abc/null-value-test-0", 1},
+                {"persistent://prop/ns-abc/null-value-test-1", 3},
+        };
+    }
+
+    @Test(dataProvider = "topics")
+    public void nullValueBytesSchemaTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer producer = pulsarClient.newProducer()
                 .topic(topic)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
@@ -120,13 +135,15 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void nullValueBooleanSchemaTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/null-value-bool-test";
+    @Test(dataProvider = "topics")
+    public void nullValueBooleanSchemaTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
                 .topic(topic)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
@@ -148,14 +165,16 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void keyValueNullInlineTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/kv-null-value-test";
+    @Test(dataProvider = "topics")
+    public void keyValueNullInlineTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer<KeyValue<String, String>> producer = pulsarClient
                 .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
                 .topic(topic)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
@@ -193,14 +212,23 @@ public class NullValueTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void keyValueNullSeparatedTest() throws PulsarClientException {
-        String topic = "persistent://prop/ns-abc/kv-null-value-test";
+    @Test(dataProvider = "topics")
+    public void keyValueNullSeparatedTest(String topic, int partitions)
+            throws PulsarClientException, PulsarAdminException {
+        admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
         Producer<KeyValue<String, String>> producer = pulsarClient
                 .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
                 .topic(topic)
+                // The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
+                // SEPARATED so we need to define a message router to guarantee the message order.
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+                        return 0;
+                    }
+                })
                 .create();
 
         @Cleanup
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1db6aee..c184f9a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -78,7 +78,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
     protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
-    protected static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
+    private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConsumerBase.class, "incomingMessagesSize");
     protected volatile long incomingMessagesSize = 0;
     protected volatile Timeout batchReceiveTimeout = null;
@@ -851,6 +851,15 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return pendingBatchReceives != null && peekNextBatchReceive() != null;
     }
 
+    protected void resetIncomingMessageSize() {
+        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+    }
+
+    protected void updateIncomingMessageSize(final Message<?> message) {
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
+                (message.getData() != null) ? message.getData().length : 0);
+    }
+
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2062671..b5c3c01 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -941,7 +941,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private BatchMessageIdImpl clearReceiverQueue() {
         List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
-        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+        resetIncomingMessageSize();
 
         if (duringSeek.compareAndSet(true, false)) {
             return seekMessageId;
@@ -1528,7 +1528,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         stats.updateNumMsgsReceived(msg);
 
         trackMessage(msg);
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
+        updateIncomingMessageSize(msg);
     }
 
     protected void trackMessage(Message<?> msg) {
@@ -1738,7 +1738,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             synchronized (this) {
                 currentSize = incomingMessages.size();
                 incomingMessages.clear();
-                INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+                resetIncomingMessageSize();
                 unAckedMessageTracker.clear();
             }
             cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
@@ -1762,7 +1762,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     public int clearIncomingMessagesAndGetMessageNumber() {
         int messagesNumber = incomingMessages.size();
         incomingMessages.clear();
-        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+        resetIncomingMessageSize();
         unAckedMessageTracker.clear();
         return messagesNumber;
     }
@@ -1916,7 +1916,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             lastDequeuedMessageId = MessageId.earliest;
 
             incomingMessages.clear();
-            INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+            resetIncomingMessageSize();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -1977,7 +1977,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             lastDequeuedMessageId = MessageId.earliest;
 
             incomingMessages.clear();
-            INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+            resetIncomingMessageSize();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
@@ -2222,7 +2222,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             // try not to remove elements that are added while we remove
             Message<T> message = incomingMessages.poll();
             while (message != null) {
-                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+                updateIncomingMessageSize(message);
                 messagesFromQueue++;
                 MessageIdImpl id = getMessageIdImpl(message);
                 if (!messageIds.contains(id)) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 49e31f4..f59c2c6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -311,7 +311,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     protected synchronized void messageProcessed(Message<?> msg) {
         unAckedMessageTracker.add(msg.getMessageId());
-        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+        updateIncomingMessageSize(msg);
     }
 
     private void resumeReceivingFromPausedConsumersIfNeeded() {
@@ -334,7 +334,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         Message<T> message;
         try {
             message = incomingMessages.take();
-            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+            updateIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -350,7 +350,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             message = incomingMessages.poll(timeout, unit);
             if (message != null) {
-                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+                updateIncomingMessageSize(message);
                 checkArgument(message instanceof TopicMessageImpl);
                 unAckedMessageTracker.add(message.getMessageId());
             }
@@ -391,7 +391,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 while (msgPeeked != null && messages.canAdd(msgPeeked)) {
                     Message<T> msg = incomingMessages.poll();
                     if (msg != null) {
-                        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
+                        updateIncomingMessageSize(msg);
                         Message<T> interceptMsg = beforeConsume(msg);
                         messages.add(interceptMsg);
                     }
@@ -419,7 +419,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             pendingReceives.add(result);
             cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
         } else {
-            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+            updateIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -625,7 +625,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 consumer.unAckedChunckedMessageIdSequenceMap.clear();
             });
             incomingMessages.clear();
-            INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+            resetIncomingMessageSize();
             unAckedMessageTracker.clear();
         } finally {
             lock.writeLock().unlock();
@@ -694,7 +694,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
         unAckedMessageTracker.clear();
         incomingMessages.clear();
-        MultiTopicsConsumerImpl.INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+        resetIncomingMessageSize();
 
         FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
             if (exception != null) {
@@ -784,7 +784,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             Message<T> message = incomingMessages.poll();
             checkState(message instanceof TopicMessageImpl);
             while (message != null) {
-                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
+                updateIncomingMessageSize(message);
                 MessageId messageId = message.getMessageId();
                 if (!messageIds.contains(messageId)) {
                     messageIds.add(messageId);


[pulsar] 02/03: [cpp-client] Fix compilation issue caused by non-virtual destructor in consumer. (#9105) (#9106)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44a70d993bb11ff5555a96901401028a82e1867f
Author: Tong <to...@users.noreply.github.com>
AuthorDate: Tue Jan 5 04:36:23 2021 +0800

    [cpp-client] Fix compilation issue caused by non-virtual destructor in consumer. (#9105) (#9106)
    
    Related issue: #9105
    
    (cherry picked from commit 68be899b7a80fb4a7ae3e7f5e82ddf29a0791c6d)
---
 pulsar-client-cpp/include/pulsar/Consumer.h | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index 86b5bb3..e049d41 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -38,6 +38,7 @@ class PULSAR_PUBLIC Consumer {
      * Construct an uninitialized consumer object
      */
     Consumer();
+    virtual ~Consumer() = default;
 
     /**
      * @return the topic this consumer is subscribed to