You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/08 06:56:06 UTC

[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17209: [feature][client-cpp] Support inclusive seek for cpp client

BewareMyPower commented on code in PR #17209:
URL: https://github.com/apache/pulsar/pull/17209#discussion_r965559130


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -170,23 +172,25 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     cnx->registerConsumer(consumerId_, shared_from_this());
 
     Lock lockForMessageId(mutexForMessageId_);
-    Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
-    if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
-        // Update startMessageId so that we can discard messages after delivery
-        // restarts
-        startMessageId_ = firstMessageInQueue;
-    }
-    const auto startMessageId = startMessageId_;
+    // Update startMessageId so that we can discard messages after delivery restarts
+    startMessageId_ = clearReceiveQueue();
+    const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
+                                        ? startMessageId_.get()
+                                        : Optional<MessageId>::empty();
     lockForMessageId.unlock();

Review Comment:
   ```suggestion
       // Update startMessageId so that we can discard messages after delivery restarts
       const auto startMessageId = clearReceiveQueue();
       lockForMessageId.unlock();
       const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
                                           ? startMessageId
                                           : Optional<MessageId>::empty();
       startMessageId_ = startMessageId;
   ```
   
   The combination of `Synchronized::operator=` and `Synchronized::get` is not atomic. We can use a local variable to save the value to update.



##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1380,4 +1365,52 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
 
+void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId,
+                                     long timestamp, ResultCallback callback) {
+    ClientConnectionPtr cnx = getCnx().lock();
+    if (!cnx) {
+        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+        callback(ResultNotConnected);
+        return;
+    }
+
+    const auto originalSeekMessageId = seekMessageId_.get();
+    seekMessageId_ = seekId;
+    duringSeek_ = true;
+    if (timestamp > 0) {
+        LOG_INFO(getName() << " Seeking subscription to " << timestamp);
+    } else {
+        LOG_INFO(getName() << " Seeking subscription to " << seekId);
+    }
+
+    std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+
+    cnx->sendRequestWithId(seek, requestId)
+        .addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
+                                                                       const ResponseData& responseData) {

Review Comment:
   We must check if `this` is still valid by validating the reference count of `weakSelf`.
   
   ```c++
   auto self = weakSelf.lock();
   if (!self) {  // this is invalid now
   ```



##########
pulsar-client-cpp/lib/ConsumerImpl.h:
##########
@@ -46,6 +46,8 @@
 #include <lib/stats/ConsumerStatsDisabled.h>
 #include <queue>
 #include <atomic>
+#include "SharedBuffer.h"

Review Comment:
   ```suggestion
   ```
   
   The new code doesn't introduce a dependency on `SharedBuffer`.



##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -170,23 +172,25 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     cnx->registerConsumer(consumerId_, shared_from_this());
 
     Lock lockForMessageId(mutexForMessageId_);
-    Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
-    if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
-        // Update startMessageId so that we can discard messages after delivery
-        // restarts
-        startMessageId_ = firstMessageInQueue;
-    }
-    const auto startMessageId = startMessageId_;
+    // Update startMessageId so that we can discard messages after delivery restarts
+    startMessageId_ = clearReceiveQueue();
+    const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
+                                        ? startMessageId_.get()
+                                        : Optional<MessageId>::empty();
     lockForMessageId.unlock();
 
     unAckedMessageTrackerPtr_->clear();
     batchAcknowledgementTracker_.clear();
 
     ClientImplPtr client = client_.lock();
     uint64_t requestId = client->newRequestId();
+    if (duringSeek_) {
+        ackGroupingTrackerPtr_->flushAndClean();
+    }

Review Comment:
   It looks like it should be put before `clearReceiveQueue` is called, see https://github.com/apache/pulsar/blob/2848fa0da09e035951220c3d04138041e1477e60/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L820-L831



##########
pulsar-client-cpp/lib/ConsumerImpl.h:
##########
@@ -169,6 +170,8 @@ class ConsumerImpl : public ConsumerImplBase,
     void drainIncomingMessageQueue(size_t count);
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
                                                 int redeliveryCount);
+    bool isPriorBatchIndex(long idx);
+    bool isPriorEntryIndex(long idx);

Review Comment:
   These two methods could be const. Actually the type should not be `long`, though Java client uses `long`.
   
   The type of batch index is `int32_t`, and the type of entry index is `int64_t`, see the `batchIndex()` and `entryId()` methods of `MessageId`.



##########
pulsar-client-cpp/tests/ConsumerTest.cc:
##########
@@ -797,4 +797,68 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
+   public:
+    void SetUp() override { producerConf_ = ProducerConfiguration().setBatchingEnabled(GetParam()); }
+
+    void TearDown() override { client_.close(); }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+};
+
+TEST_P(ConsumerSeekTest, testSeekForMessageId) {
+    Client client(lookupUrl);
+    auto n = std::chrono::system_clock::now();
+    auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(n.time_since_epoch());
+
+    const std::string topic = "test-seek-for-message-id-" + std::to_string(now.count());

Review Comment:
   We can simply use `std::to_string(time(nullptr))` as the topic suffix like other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org