You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/25 06:59:32 UTC

[pulsar] branch master updated: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cf8d441882c [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943)
cf8d441882c is described below

commit cf8d441882c2844a1330bcfcf309806956e8cfc2
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Aug 25 14:59:24 2022 +0800

    [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943)
    
    Fixes #16761
    
    ### Motivation
    
    When C++ client calls `ConsumerImpl::getLastMessageIdAsync`, if the connection is not established, the callback will complete with `ResultNotConnected`.
    
    ### Modifications
    
    * Wait until the connection is established while getting the last message id.
---
 pulsar-client-cpp/lib/ConsumerImpl.cc   | 37 +++++++++++++++++++++++++++++++--
 pulsar-client-cpp/lib/ConsumerImpl.h    |  4 ++++
 pulsar-client-cpp/tests/ConsumerTest.cc | 32 ++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/PulsarFriend.h  |  4 ++++
 pulsar-client-cpp/tests/ReaderTest.cc   | 27 ++++++++++++++++++++++++
 5 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 4e1d4f004cb..79c20d84649 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -1299,6 +1299,16 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
         return;
     }
 
+    TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds());
+    BackoffPtr backoff = std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0));
+    DeadlineTimerPtr timer = executor_->createDeadlineTimer();
+
+    internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback);
+}
+
+void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
+                                                 const DeadlineTimerPtr& timer,
+                                                 BrokerGetLastMessageIdCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         if (cnx->getServerProtocolVersion() >= proto::v12) {
@@ -1326,8 +1336,31 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             callback(ResultUnsupportedVersionError, MessageId());
         }
     } else {
-        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-        callback(ResultNotConnected, MessageId());
+        TimeDuration next = std::min(remainTime, backoff->next());
+        if (next.total_milliseconds() <= 0) {
+            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+            callback(ResultNotConnected, MessageId());
+            return;
+        }
+        remainTime -= next;
+
+        timer->expires_from_now(next);
+
+        auto self = shared_from_this();
+        timer->async_wait([this, backoff, remainTime, timer, next, callback,
+                           self](const boost::system::error_code& ec) -> void {
+            if (ec == boost::asio::error::operation_aborted) {
+                LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "].");
+                return;
+            }
+            if (ec) {
+                LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "].");
+                return;
+            }
+            LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in "
+                               << next.total_milliseconds() << " ms")
+            this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback);
+        });
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 346d3515ad7..78140f84b54 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -56,6 +56,7 @@ class ConsumerImpl;
 class BatchAcknowledgementTracker;
 typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
 typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
+typedef std::shared_ptr<Backoff> BackoffPtr;
 
 enum ConsumerTopicType
 {
@@ -181,6 +182,9 @@ class ConsumerImpl : public ConsumerImplBase,
     void failPendingReceiveCallback();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
     void trackMessage(const MessageId& messageId);
+    void internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
+                                       const DeadlineTimerPtr& timer,
+                                       BrokerGetLastMessageIdCallback callback);
 
     Optional<MessageId> clearReceiveQueue();
 
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index adb94a4ad13..575abb69635 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -763,4 +763,36 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
     thread.join();
 }
 
+TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
+    int operationTimeout = 5;
+    ClientConfiguration clientConfiguration;
+    clientConfiguration.setOperationTimeoutSeconds(operationTimeout);
+
+    Client client(lookupUrl, clientConfiguration);
+    const std::string topic =
+        "testGetLastMessageIdBlockWhenConnectionDisconnected-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
+
+    ConsumerImpl& consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    ClientConnectionWeakPtr conn = PulsarFriend::getClientConnection(consumerImpl);
+
+    PulsarFriend::setClientConnection(consumerImpl, std::weak_ptr<ClientConnection>());
+
+    pulsar::Latch latch(1);
+    auto start = TimeUtils::now();
+
+    consumerImpl.getLastMessageIdAsync([&latch](Result r, const GetLastMessageIdResponse&) -> void {
+        ASSERT_EQ(r, ResultNotConnected);
+        latch.countdown();
+    });
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(20)));
+    auto elapsed = TimeUtils::now() - start;
+
+    // getLastMessageIdAsync should be blocked until operationTimeout when the connection is disconnected.
+    ASSERT_GE(elapsed.seconds(), operationTimeout);
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index bc79fce0c4f..b74c0e1241b 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -116,6 +116,10 @@ class PulsarFriend {
 
     static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }
 
+    static void setClientConnection(HandlerBase& handler, ClientConnectionWeakPtr conn) {
+        handler.connection_ = conn;
+    }
+
     static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
         return backoff.firstBackoffTime_;
     }
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index bf156927590..799702f9b4c 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -579,3 +579,30 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
     EXPECT_FALSE(hasMessageAvailable);
     client.close();
 }
+
+TEST(ReaderTest, testReceiveAfterSeek) {
+    Client client(serviceUrl);
+    const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    MessageId seekMessageId;
+    for (int i = 0; i < 5; i++) {
+        MessageId messageId;
+        producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), messageId);
+        if (i == 3) {
+            seekMessageId = messageId;
+        }
+    }
+
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
+
+    reader.seek(seekMessageId);
+
+    bool hasMessageAvailable;
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+
+    client.close();
+}