You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/08 04:56:09 UTC

[pulsar] branch branch-2.8 updated (f657bf8a36d -> d81bb184aba)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from f657bf8a36d [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
     new 81beccdbf0b [Client] Add test to ensure the message order in listener callbacks (#15049)
     new d81bb184aba [C++] Fix single message metadata not set correctly (#15072)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client/api/SimpleProducerConsumerTest.java     | 30 +++++++++
 pulsar-client-cpp/lib/Commands.cc                  | 31 ++++++---
 pulsar-client-cpp/lib/Message.cc                   | 26 ++++++++
 pulsar-client-cpp/tests/BatchMessageTest.cc        | 78 ++++++++++++++++++++++
 pulsar-client-cpp/tests/SchemaTest.cc              | 34 ++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  9 ++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 ---
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  2 +-
 9 files changed, 197 insertions(+), 27 deletions(-)


[pulsar] 02/02: [C++] Fix single message metadata not set correctly (#15072)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d81bb184abaa7a4b97375327c25c08df43e13b09
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 8 12:35:27 2022 +0800

    [C++] Fix single message metadata not set correctly (#15072)
    
    ### Motivation
    
    Recently I found the messages sent by C++ producer don't have the schema
    version, which causes Java consumer cannot consume them with
    `AUTO_CONSUME` schema. After rechecking the code, I found the C++ client
    doesn't set single message metadata correctly, i.e. when batching is
    enabled, some messages' metadata could be wrong.
    
    - In `initBatchMessageMetadata`, the schema version is not set.
    - In `serializeSingleMessageInBatchWithPayload`, the ordering key and
      the sequence id are not set.
    
    In addition, when a C++ consumer consumes batched messages from a Java
    producer, some metadata might be wrong. Because even for batched
    messages, Java producer also sets the partition key and the ordering
    key. It's redundant because only keys in `SingleMessageMetadata` should
    be set. To avoid 2nd and later single messages in the batch reuse the
    keys in `MessageMetadata`, Java client clears these fields if the
    `SingleMessageMetadata` doesn't contain them when a `MessageImpl` is
    constructed.
    
    ### Modifications
    
    - Set the fields that were not set before when creating a batch. Some
      fields like `null_value` and `null_partition_key` are not set because
      they are not supported by C++ client at this moment.
    - Use a more efficient way to copy the repeated fields of ProtoBuf.
    - Clear some fields when they are not contained by the
      `SingleMessageMetadata` object when creating a `MessageImpl` so that
      the bahavior could be consisitent with Java client.
    
    ### Verifying this change
    
    Following tests are added:
    - `BatchMessageTest.testSingleMessageMetadata`: test 3 single messages
      in batch are consumed successfully, i.e. the correct metadata is received.
    - `SchemaTest.testHasSchemaVersion`: test when schema is configured, all
      messages should has the schema version.
    - The validation for schema version is also added to
      `ProtobufNativeSchemaTest.testEndToEnd`.
    
    (cherry picked from commit 6f41fdebf642b08e2c3f45f3574963ec0936868c)
    
    Resolve the conflicts by deleting `ProtobufNativeSchemaTest.cc` because
    it only exists from branch-2.9.
---
 pulsar-client-cpp/lib/Commands.cc           | 31 ++++++++----
 pulsar-client-cpp/lib/Message.cc            | 26 ++++++++++
 pulsar-client-cpp/tests/BatchMessageTest.cc | 78 +++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/SchemaTest.cc       | 34 +++++++++++++
 4 files changed, 158 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 95e0cc792b6..48beb98bfbd 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -677,26 +677,35 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa
             batchMetadata.add_replicate_to(metadata.replicate_to(i));
         }
     }
-    // TODO: set other optional fields
+    if (metadata.has_schema_version()) {
+        batchMetadata.set_schema_version(metadata.schema_version());
+    }
 }
 
 uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
                                                             unsigned long maxMessageSizeInBytes) {
+    const auto& msgMetadata = msg.impl_->metadata;
     SingleMessageMetadata metadata;
-    if (msg.impl_->hasPartitionKey()) {
-        metadata.set_partition_key(msg.impl_->getPartitionKey());
+    if (msgMetadata.has_partition_key()) {
+        metadata.set_partition_key(msgMetadata.partition_key());
+    }
+    if (msgMetadata.has_ordering_key()) {
+        metadata.set_ordering_key(msgMetadata.ordering_key());
     }
 
-    for (MessageBuilder::StringMap::const_iterator it = msg.impl_->properties().begin();
-         it != msg.impl_->properties().end(); it++) {
-        proto::KeyValue* keyValue = proto::KeyValue().New();
-        keyValue->set_key(it->first);
-        keyValue->set_value(it->second);
+    metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
+    for (int i = 0; i < msgMetadata.properties_size(); i++) {
+        auto keyValue = proto::KeyValue().New();
+        *keyValue = msgMetadata.properties(i);
         metadata.mutable_properties()->AddAllocated(keyValue);
     }
 
-    if (msg.impl_->getEventTimestamp() != 0) {
-        metadata.set_event_time(msg.impl_->getEventTimestamp());
+    if (msgMetadata.has_event_time()) {
+        metadata.set_event_time(msgMetadata.event_time());
+    }
+
+    if (msgMetadata.has_sequence_id()) {
+        metadata.set_sequence_id(msgMetadata.sequence_id());
     }
 
     // Format of batch message
@@ -726,7 +735,7 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
     batchPayLoad.bytesWritten(msgMetadataSize);
     batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
 
-    return msg.impl_->metadata.sequence_id();
+    return msgMetadata.sequence_id();
 }
 
 Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 76e408ffef4..b928945cfae 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -79,12 +79,38 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, S
     impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
     impl_->topicName_ = &topicName;
 
+    impl_->metadata.clear_properties();
+    if (singleMetadata.properties_size() > 0) {
+        impl_->metadata.mutable_properties()->Reserve(singleMetadata.properties_size());
+        for (int i = 0; i < singleMetadata.properties_size(); i++) {
+            auto keyValue = proto::KeyValue().New();
+            *keyValue = singleMetadata.properties(i);
+            impl_->metadata.mutable_properties()->AddAllocated(keyValue);
+        }
+    }
+
     if (singleMetadata.has_partition_key()) {
         impl_->metadata.set_partition_key(singleMetadata.partition_key());
+    } else {
+        impl_->metadata.clear_partition_key();
+    }
+
+    if (singleMetadata.has_ordering_key()) {
+        impl_->metadata.set_ordering_key(singleMetadata.ordering_key());
+    } else {
+        impl_->metadata.clear_ordering_key();
     }
 
     if (singleMetadata.has_event_time()) {
         impl_->metadata.set_event_time(singleMetadata.event_time());
+    } else {
+        impl_->metadata.clear_event_time();
+    }
+
+    if (singleMetadata.has_sequence_id()) {
+        impl_->metadata.set_sequence_id(singleMetadata.sequence_id());
+    } else {
+        impl_->metadata.clear_sequence_id();
     }
 }
 
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index ebe6bf159ba..62fd5fff25c 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -1071,3 +1071,81 @@ TEST(BatchMessageTest, testProducerQueueWithBatches) {
 
     ASSERT_EQ(rejectedMessges, 10);
 }
+
+TEST(BatchMessageTest, testSingleMessageMetadata) {
+    const auto topic = "BatchMessageTest-SingleMessageMetadata-" + std::to_string(time(nullptr));
+    constexpr int numMessages = 3;
+
+    Client client(lookupUrl);
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(
+                            topic, ProducerConfiguration().setBatchingMaxMessages(numMessages), producer));
+
+    producer.sendAsync(MessageBuilder()
+                           .setContent("msg-0")
+                           .setPartitionKey("key-0")
+                           .setOrderingKey("ordering-key-0")
+                           .setEventTimestamp(10UL)
+                           .setProperty("k0", "v0")
+                           .setProperty("k1", "v1")
+                           .build(),
+                       nullptr);
+    producer.sendAsync(MessageBuilder()
+                           .setContent("msg-1")
+                           .setOrderingKey("ordering-key-1")
+                           .setEventTimestamp(11UL)
+                           .setProperty("k2", "v2")
+                           .build(),
+                       nullptr);
+    producer.sendAsync(MessageBuilder().setContent("msg-2").build(), nullptr);
+    ASSERT_EQ(ResultOk, producer.flush());
+
+    Message msgs[numMessages];
+    for (int i = 0; i < numMessages; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        msgs[i] = msg;
+        LOG_INFO("message " << i << ": " << msg.getDataAsString()
+                            << ", key: " << (msg.hasPartitionKey() ? msg.getPartitionKey() : "(null)")
+                            << ", ordering key: " << (msg.hasOrderingKey() ? msg.getOrderingKey() : "(null)")
+                            << ", event time: " << (msg.getEventTimestamp())
+                            << ", properties count: " << msg.getProperties().size()
+                            << ", has schema version: " << msg.hasSchemaVersion());
+    }
+
+    ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
+    ASSERT_TRUE(msgs[0].hasPartitionKey());
+    ASSERT_EQ(msgs[0].getPartitionKey(), "key-0");
+    ASSERT_TRUE(msgs[0].hasOrderingKey());
+    ASSERT_EQ(msgs[0].getOrderingKey(), "ordering-key-0");
+    ASSERT_EQ(msgs[0].getEventTimestamp(), 10UL);
+    ASSERT_EQ(msgs[0].getProperties().size(), 2);
+    ASSERT_TRUE(msgs[0].hasProperty("k0"));
+    ASSERT_EQ(msgs[0].getProperty("k0"), "v0");
+    ASSERT_TRUE(msgs[0].hasProperty("k1"));
+    ASSERT_EQ(msgs[0].getProperty("k1"), "v1");
+    ASSERT_FALSE(msgs[0].hasSchemaVersion());
+
+    ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
+    ASSERT_FALSE(msgs[1].hasPartitionKey());
+    ASSERT_TRUE(msgs[1].hasOrderingKey());
+    ASSERT_EQ(msgs[1].getOrderingKey(), "ordering-key-1");
+    ASSERT_EQ(msgs[1].getEventTimestamp(), 11UL);
+    ASSERT_EQ(msgs[1].getProperties().size(), 1);
+    ASSERT_TRUE(msgs[1].hasProperty("k2"));
+    ASSERT_EQ(msgs[1].getProperty("k2"), "v2");
+    ASSERT_FALSE(msgs[1].hasSchemaVersion());
+
+    ASSERT_EQ(msgs[2].getDataAsString(), "msg-2");
+    ASSERT_FALSE(msgs[2].hasPartitionKey());
+    ASSERT_FALSE(msgs[2].hasOrderingKey());
+    ASSERT_EQ(msgs[2].getEventTimestamp(), 0UL);
+    ASSERT_EQ(msgs[2].getProperties().size(), 0);
+    ASSERT_FALSE(msgs[2].hasSchemaVersion());
+
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/SchemaTest.cc b/pulsar-client-cpp/tests/SchemaTest.cc
index e11069fb06b..45491b9cf81 100644
--- a/pulsar-client-cpp/tests/SchemaTest.cc
+++ b/pulsar-client-cpp/tests/SchemaTest.cc
@@ -70,3 +70,37 @@ TEST(SchemaTest, testSchema) {
     res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
     ASSERT_EQ(ResultIncompatibleSchema, res);
 }
+
+TEST(SchemaTest, testHasSchemaVersion) {
+    Client client(lookupUrl);
+    std::string topic = "SchemaTest-HasSchemaVersion";
+    SchemaInfo stringSchema(SchemaType::STRING, "String", "");
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic + "1", "sub", ConsumerConfiguration().setSchema(stringSchema),
+                                         consumer));
+    Producer batchedProducer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
+                                              batchedProducer));
+    Producer nonBatchedProducer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
+                                              nonBatchedProducer));
+
+    ASSERT_EQ(ResultOk, batchedProducer.send(MessageBuilder().setContent("msg-0").build()));
+    ASSERT_EQ(ResultOk, nonBatchedProducer.send(MessageBuilder().setContent("msg-1").build()));
+
+    Message msgs[2];
+    ASSERT_EQ(ResultOk, consumer.receive(msgs[0], 3000));
+    ASSERT_EQ(ResultOk, consumer.receive(msgs[1], 3000));
+
+    std::string schemaVersion(8, '\0');
+    ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
+    ASSERT_TRUE(msgs[0].hasSchemaVersion());
+    ASSERT_EQ(msgs[0].getSchemaVersion(), schemaVersion);
+
+    ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
+    ASSERT_TRUE(msgs[1].hasSchemaVersion());
+    ASSERT_EQ(msgs[1].getSchemaVersion(), schemaVersion);
+
+    client.close();
+}


[pulsar] 01/02: [Client] Add test to ensure the message order in listener callbacks (#15049)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 81beccdbf0bac4e8b6197f8dc897d46b78c13a37
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Apr 7 14:55:16 2022 +0800

    [Client] Add test to ensure the message order in listener callbacks (#15049)
    
    (cherry picked from commit c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479)
    
    Resolve the conflicts by removing unused `isTxnMessage` and
    `tryTriggerListener` methods from `ConsumerImpl`.
---
 .../client/api/SimpleProducerConsumerTest.java     | 30 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  9 +++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 --------
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 +--
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  2 +-
 5 files changed, 39 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 86ee045b258..d2b67575324 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -59,6 +59,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
@@ -4250,4 +4251,33 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(invocationCount = 5)
+    public void testListenerOrdering() throws Exception {
+        final String topic = "persistent://my-property/my-ns/test-listener-ordering-" + System.currentTimeMillis();
+        final int numMessages = 1000;
+        final CountDownLatch latch = new CountDownLatch(numMessages);
+        final List<String> values = new CopyOnWriteArrayList<>();
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .messageListener((MessageListener<String>) (consumer1, msg) -> {
+                    values.add(msg.getValue());
+                    latch.countDown();
+                })
+                .subscribe();
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("msg-" + i);
+        }
+        latch.await(3, TimeUnit.SECONDS);
+        producer.close();
+        consumer.close();
+        assertEquals(values.size(), numMessages);
+        for (int i = 0; i < numMessages; i++) {
+            assertEquals(values.get(i), "msg-" + i);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dd21666f5da..7bc63d89f98 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -906,8 +906,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
-    protected void triggerListener() {
-        // Use internalPinnedExecutor to maintain message ordering
+    protected void tryTriggerListener() {
+        if (listener != null) {
+            triggerListener();
+        }
+    }
+
+    private void triggerListener() {
         internalPinnedExecutor.execute(() -> {
             try {
                 // Listener should only have one pending/running executable to process a message
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8104123e4c3..f680d1e4f88 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1090,16 +1090,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     }
 
-    private void tryTriggerListener() {
-        if (listener != null) {
-            triggerListener();
-        }
-    }
-
-    private boolean isTxnMessage(MessageMetadata messageMetadata) {
-        return messageMetadata.hasTxnidMostBits() && messageMetadata.hasTxnidLeastBits();
-    }
-
     private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
             MessageIdData messageId, ClientCnx cnx) {
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fd98bb6fb4b..bcccd09d7c7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -301,9 +301,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             notifyPendingBatchReceivedCallBack();
         }
 
-        if (listener != null) {
-            triggerListener();
-        }
+        tryTriggerListener();
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index f6302e3779c..fca9cffcfdc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -173,7 +173,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
     }
 
     @Override
-    protected void triggerListener() {
+    protected void tryTriggerListener() {
         // Ignore since it was already triggered in the triggerZeroQueueSizeListener() call
     }