You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/11 08:49:48 UTC

[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14604: [C++] add chunk message id

BewareMyPower commented on a change in pull request #14604:
URL: https://github.com/apache/pulsar/pull/14604#discussion_r824456756



##########
File path: pulsar-client-cpp/lib/Message.cc
##########
@@ -70,6 +70,14 @@ Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metad
     impl_->payload = payload;
 }
 
+Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
+                 int32_t partition)

Review comment:
       The `partition` parameter is never used.

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -56,18 +66,21 @@ const MessageId& MessageId::latest() {
     return _latest;
 }
 
-void MessageId::serialize(std::string& result) const {
-    proto::MessageIdData idData;
-    idData.set_ledgerid(impl_->ledgerId_);
-    idData.set_entryid(impl_->entryId_);
-    if (impl_->partition_ != -1) {
-        idData.set_partition(impl_->partition_);
+bool MessageId::isChunkMessageid() const {
+    if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_) != nullptr) {
+        return true;
     }
+    return false;
+}
 
-    if (impl_->batchIndex_ != -1) {
-        idData.set_batch_index(impl_->batchIndex_);
+void MessageId::serialize(std::string& result) const {
+    proto::MessageIdData idData;
+    writeMessageIdData(impl_, &idData);
+    if (isChunkMessageid() == true) {

Review comment:
       ```suggestion
       if (isChunkMessageid()) {
   ```

##########
File path: pulsar-client-cpp/lib/ChunkMessageIdImpl.h
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include "MessageIdImpl.h"
+
+namespace pulsar {
+
+class MessageIdImpl;
+
+class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
+   public:
+    ChunkMessageIdImpl(MessageIdImpl& firstChunkMsgId, MessageIdImpl& lastChunkMsgId)

Review comment:
       ```suggestion
       ChunkMessageIdImpl(const MessageIdImpl& firstChunkMsgId, const MessageIdImpl& lastChunkMsgId)
   ```

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -56,18 +66,21 @@ const MessageId& MessageId::latest() {
     return _latest;
 }
 
-void MessageId::serialize(std::string& result) const {
-    proto::MessageIdData idData;
-    idData.set_ledgerid(impl_->ledgerId_);
-    idData.set_entryid(impl_->entryId_);
-    if (impl_->partition_ != -1) {
-        idData.set_partition(impl_->partition_);
+bool MessageId::isChunkMessageid() const {
+    if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_) != nullptr) {
+        return true;
     }
+    return false;
+}
 
-    if (impl_->batchIndex_ != -1) {
-        idData.set_batch_index(impl_->batchIndex_);
+void MessageId::serialize(std::string& result) const {
+    proto::MessageIdData idData;
+    writeMessageIdData(impl_, &idData);
+    if (isChunkMessageid() == true) {

Review comment:
       BTW, I think it's better to try casting the pointer first, and then check if it's `ChunkMessageIdImpl`.
   
   ```c++
   auto chunkMsgIdImpl = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
   if (chunkMsgIdImpl) {
       /* ... */
   }
   ```

##########
File path: pulsar-client-cpp/tests/ChunkMessageIdImplTest.cc
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/MessageId.h>
+#include "lib/MessageIdUtil.h"
+#include "ChunkMessageIdImpl.h"
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+using namespace pulsar;
+
+TEST(ChunkMessageIdImplTest, testSerialization) {
+    MessageId msgId(MessageId(1, 1, 1, 1), MessageId(2, 2, 2, 2));
+
+    std::string serialized;
+    msgId.serialize(serialized);
+
+    MessageId deserialized = MessageId::deserialize(serialized);
+
+    ASSERT_EQ(msgId, deserialized);
+}
+
+TEST(ChunkMessageIdImplTest, testCompareLedgerAndEntryId) {
+    MessageId id1(MessageId(1, 1, 1, 1), MessageId(-1, 2L, 1L, 0));
+    MessageId id2(MessageId(1, 1, 1, 1), MessageId(-1, 2L, 1L, 1));
+    MessageId id3(MessageId(1, 1, 1, 1), MessageId(-1, 2L, 2L, 0));
+    MessageId id4(MessageId(1, 1, 1, 1), MessageId(-1, 3L, 0L, 0));

Review comment:
       Both using or not using the implicit conversion are okay. But it's better to make the constructor code style consistent. 
   
   i.e. both
   
   ```c++
   MessageId id1(1, 1, 1, 1);
   MessageId id2(1, 2, 3, 4);
   ```
   
   and
   
   ```
   MessageId id1(1, 1L, 1L, 1);
   MessageId id2(1, 2L, 3L, 4);
   ```
   
   are okay. But
   
   ```c++
   MessageId id1(1, 1, 1, 1);
   MessageId id2(1, 2L, 3L, 4);
   ```
   
   looks bad.

##########
File path: pulsar-client-cpp/lib/OpSendMsg.h
##########
@@ -25,9 +25,26 @@
 
 #include "TimeUtils.h"
 #include "MessageImpl.h"
+#include "ChunkMessageIdImpl.h"
 
 namespace pulsar {
 
+struct ProducerChunkedMessageCtx {
+   public:
+    ProducerChunkedMessageCtx() = default;
+
+    ProducerChunkedMessageCtx(MessageIdImplPtr firstChunkMessageId, MessageIdImplPtr lastChunkMessageId)

Review comment:
       Even it's a shared pointer, please pass it by const reference. (not only here)
   
   See https://stackoverflow.com/questions/3310737/should-we-pass-a-shared-ptr-by-reference-or-by-value

##########
File path: pulsar-client-cpp/lib/ConsumerImpl.cc
##########
@@ -1229,7 +1244,17 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
         LOG_DEBUG(getName() << " Sending seek Command for Consumer - " << getConsumerId() << ", requestId - "
                             << requestId);
         Future<Result, ResponseData> future =
-            cnx->sendRequestWithId(Commands::newSeek(consumerId_, requestId, msgId), requestId);
+            msgId.isChunkMessageid()
+                ? cnx->sendRequestWithId(
+                      Commands::newSeek(consumerId_, requestId,
+                                        std::dynamic_pointer_cast<ChunkMessageIdImpl>(msgId.impl_)
+                                            ->getFirstChunkMessageIdImpl()

Review comment:
       Do you make sure the casted pointer is not null?

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -92,8 +110,20 @@ int32_t MessageId::batchIndex() const { return impl_->batchIndex_; }
 int32_t MessageId::partition() const { return impl_->partition_; }
 
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
-    s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
-      << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
+    std::function<void(std::ostream&, const MessageIdImplPtr)> printMsgIdImpl =
+        [](std::ostream& s, const MessageIdImplPtr impl) {
+            s << '(' << impl->ledgerId_ << ',' << impl->entryId_ << ',' << impl->partition_ << ','
+              << impl->batchIndex_ << ')';
+        };
+    if (messageId.isChunkMessageid() == false) {
+        printMsgIdImpl(s, messageId.impl_);
+    } else {
+        auto firstMessageidImplPtr =
+            std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_)->getFirstChunkMessageIdImpl();
+        printMsgIdImpl(s, firstMessageidImplPtr);
+        s << "->";
+        printMsgIdImpl(s, messageId.impl_);
+    }

Review comment:
       It's better to use:
   
   ```c++
   const auto chunkMsgIdImpl = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
   if (chunkMsgIdImpl) {
       // TODO: print a chunked msg id
   } else {
       // TODO: print a normal msg id
   }
   ```

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -45,6 +47,14 @@ MessageId& MessageId::operator=(const MessageId& m) {
 MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
     : impl_(std::make_shared<MessageIdImpl>(partition, ledgerId, entryId, batchIndex)) {}
 
+MessageId::MessageId(const MessageId& firstMessageId, const MessageId& lastMessageId) {
+    auto firstImpl = firstMessageId.impl_, lastImpl = lastMessageId.impl_;

Review comment:
       It's better not to define multiple variables in a line.

##########
File path: pulsar-client-cpp/tests/MessageChunkSeekTest.cc
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <ctime>
+#include <random>
+
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+#include "lib/LogUtils.h"
+#include "PulsarFriend.h"
+#include "MessageIdImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
+static constexpr size_t maxMessageSize = 1024000;
+
+static std::string toString(CompressionType compressionType) {
+    switch (compressionType) {
+        case CompressionType::CompressionNone:
+            return "None";
+        case CompressionType::CompressionLZ4:
+            return "LZ4";
+        case CompressionType::CompressionZLib:
+            return "ZLib";
+        case CompressionType::CompressionZSTD:
+            return "ZSTD";
+        case CompressionType::CompressionSNAPPY:
+            return "SNAPPY";
+        default:
+            return "Unknown (" + std::to_string(compressionType) + ")";
+    }
+}
+
+inline std::string createLargeMessage() {
+    std::string largeMessage(maxMessageSize * 3, 'a');
+    std::default_random_engine e(time(nullptr));
+    std::uniform_int_distribution<unsigned> u(0, 25);
+    for (size_t i = 0; i < largeMessage.size(); i++) {
+        largeMessage[i] = 'a' + u(e);
+    }
+    return largeMessage;
+}
+
+class MessageChunkingSeekTest : public ::testing::TestWithParam<CompressionType> {
+   public:
+    static std::string largeMessage;
+
+    void TearDown() override { client_.close(); }
+
+    void createProducer(const std::string& topic, Producer& producer) {
+        ProducerConfiguration conf;
+        conf.setBatchingEnabled(false);
+        conf.setChunkingEnabled(true);
+        conf.setCompressionType(GetParam());
+        LOG_INFO("Create producer to topic: " << topic
+                                              << ", compression: " << toString(conf.getCompressionType()));
+        ASSERT_EQ(ResultOk, client_.createProducer(topic, conf, producer));
+    }
+
+    void createConsumer(const std::string& topic, Consumer& consumer) {
+        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
+    }
+
+   private:
+    Client client_{lookupUrl};
+};
+
+std::string MessageChunkingSeekTest::largeMessage = createLargeMessage();
+
+TEST_P(MessageChunkingSeekTest, testSeek) {
+    const std::string topic =
+        "MessageChunkingSeekTest-testSeek-" + toString(GetParam()) + std::to_string(time(nullptr));
+    Consumer consumer;
+    createConsumer(topic, consumer);
+    Producer producer;
+    createProducer(topic, producer);
+
+    constexpr int numMessages = 20;
+
+    std::vector<MessageId> receiveMessageIds;
+    for (int i = 0; i < numMessages; i++) {
+        MessageId messageId;
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+        LOG_INFO("Send " << i << " to " << messageId);
+    }
+
+    Message msg;
+    for (int i = 0; i < numMessages; i++) {
+        consumer.receive(msg, 3000);
+        receiveMessageIds.push_back(msg.getMessageId());
+    }
+
+    consumer.seek(receiveMessageIds[1]);

Review comment:
       I see you just copied the `MessageChunkingTest.cc` except adding this line and below. There is too much repeated code. Could you just add the `testSeekChunkMessage` into `MessageChunkingTest`? And since this test is only responsible to verify whether the seek works, there is no need to test all the compression types. The test should be more simple.
   
   See the similar test in Java client.
   
   https://github.com/apache/pulsar/blob/7998c44b0b85e8ae1af5fec64d8f873032877a2f/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java#L453-L508

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -56,18 +66,21 @@ const MessageId& MessageId::latest() {
     return _latest;
 }
 
-void MessageId::serialize(std::string& result) const {
-    proto::MessageIdData idData;
-    idData.set_ledgerid(impl_->ledgerId_);
-    idData.set_entryid(impl_->entryId_);
-    if (impl_->partition_ != -1) {
-        idData.set_partition(impl_->partition_);
+bool MessageId::isChunkMessageid() const {
+    if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_) != nullptr) {
+        return true;
     }
+    return false;

Review comment:
       ```suggestion
       return std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_) != nullptr;
   ```

##########
File path: pulsar-client-cpp/lib/OpSendMsg.h
##########
@@ -25,9 +25,26 @@
 
 #include "TimeUtils.h"
 #include "MessageImpl.h"
+#include "ChunkMessageIdImpl.h"
 
 namespace pulsar {
 
+struct ProducerChunkedMessageCtx {
+   public:
+    ProducerChunkedMessageCtx() = default;
+
+    ProducerChunkedMessageCtx(MessageIdImplPtr firstChunkMessageId, MessageIdImplPtr lastChunkMessageId)
+        : firstChunkMessageIdImplPtr_(firstChunkMessageId), lastChunkMessageIdImplPtr_(lastChunkMessageId) {}
+
+    MessageIdImplPtr getChunkMessageId() {
+        return std::make_shared<ChunkMessageIdImpl>(*firstChunkMessageIdImplPtr_, *lastChunkMessageIdImplPtr_)
+            ->getLastChunkMessageIdImpl();
+    }
+
+    MessageIdImplPtr firstChunkMessageIdImplPtr_;
+    MessageIdImplPtr lastChunkMessageIdImplPtr_;

Review comment:
       Make them private. The reason `OpSendMsg`'s fields are not private is historical. Because some fields are accessed directly. I'll add some refactors in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org