You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/25 06:59:32 UTC
[pulsar] branch master updated: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cf8d441882c [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943)
cf8d441882c is described below
commit cf8d441882c2844a1330bcfcf309806956e8cfc2
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Aug 25 14:59:24 2022 +0800
[fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943)
Fixes #16761
### Motivation
When C++ client calls `ConsumerImpl::getLastMessageIdAsync`, if the connection is not established, the callback will complete with `ResultNotConnected`.
### Modifications
* Wait until the connection is established while getting the last message id.
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 37 +++++++++++++++++++++++++++++++--
pulsar-client-cpp/lib/ConsumerImpl.h | 4 ++++
pulsar-client-cpp/tests/ConsumerTest.cc | 32 ++++++++++++++++++++++++++++
pulsar-client-cpp/tests/PulsarFriend.h | 4 ++++
pulsar-client-cpp/tests/ReaderTest.cc | 27 ++++++++++++++++++++++++
5 files changed, 102 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 4e1d4f004cb..79c20d84649 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -1299,6 +1299,16 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
return;
}
+ TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds());
+ BackoffPtr backoff = std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0));
+ DeadlineTimerPtr timer = executor_->createDeadlineTimer();
+
+ internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback);
+}
+
+void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
+ const DeadlineTimerPtr& timer,
+ BrokerGetLastMessageIdCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
if (cnx->getServerProtocolVersion() >= proto::v12) {
@@ -1326,8 +1336,31 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
callback(ResultUnsupportedVersionError, MessageId());
}
} else {
- LOG_ERROR(getName() << " Client Connection not ready for Consumer");
- callback(ResultNotConnected, MessageId());
+ TimeDuration next = std::min(remainTime, backoff->next());
+ if (next.total_milliseconds() <= 0) {
+ LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+ callback(ResultNotConnected, MessageId());
+ return;
+ }
+ remainTime -= next;
+
+ timer->expires_from_now(next);
+
+ auto self = shared_from_this();
+ timer->async_wait([this, backoff, remainTime, timer, next, callback,
+ self](const boost::system::error_code& ec) -> void {
+ if (ec == boost::asio::error::operation_aborted) {
+ LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "].");
+ return;
+ }
+ if (ec) {
+ LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "].");
+ return;
+ }
+ LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in "
+ << next.total_milliseconds() << " ms")
+ this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback);
+ });
}
}
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 346d3515ad7..78140f84b54 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -56,6 +56,7 @@ class ConsumerImpl;
class BatchAcknowledgementTracker;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
+typedef std::shared_ptr<Backoff> BackoffPtr;
enum ConsumerTopicType
{
@@ -181,6 +182,9 @@ class ConsumerImpl : public ConsumerImplBase,
void failPendingReceiveCallback();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
void trackMessage(const MessageId& messageId);
+ void internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
+ const DeadlineTimerPtr& timer,
+ BrokerGetLastMessageIdCallback callback);
Optional<MessageId> clearReceiveQueue();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index adb94a4ad13..575abb69635 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -763,4 +763,36 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
thread.join();
}
+TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
+ int operationTimeout = 5;
+ ClientConfiguration clientConfiguration;
+ clientConfiguration.setOperationTimeoutSeconds(operationTimeout);
+
+ Client client(lookupUrl, clientConfiguration);
+ const std::string topic =
+ "testGetLastMessageIdBlockWhenConnectionDisconnected-" + std::to_string(time(nullptr));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
+
+ ConsumerImpl& consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+ ClientConnectionWeakPtr conn = PulsarFriend::getClientConnection(consumerImpl);
+
+ PulsarFriend::setClientConnection(consumerImpl, std::weak_ptr<ClientConnection>());
+
+ pulsar::Latch latch(1);
+ auto start = TimeUtils::now();
+
+ consumerImpl.getLastMessageIdAsync([&latch](Result r, const GetLastMessageIdResponse&) -> void {
+ ASSERT_EQ(r, ResultNotConnected);
+ latch.countdown();
+ });
+
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(20)));
+ auto elapsed = TimeUtils::now() - start;
+
+ // getLastMessageIdAsync should be blocked until operationTimeout when the connection is disconnected.
+ ASSERT_GE(elapsed.seconds(), operationTimeout);
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index bc79fce0c4f..b74c0e1241b 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -116,6 +116,10 @@ class PulsarFriend {
static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }
+ static void setClientConnection(HandlerBase& handler, ClientConnectionWeakPtr conn) {
+ handler.connection_ = conn;
+ }
+
static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
return backoff.firstBackoffTime_;
}
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index bf156927590..799702f9b4c 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -579,3 +579,30 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
EXPECT_FALSE(hasMessageAvailable);
client.close();
}
+
+TEST(ReaderTest, testReceiveAfterSeek) {
+ Client client(serviceUrl);
+ const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ MessageId seekMessageId;
+ for (int i = 0; i < 5; i++) {
+ MessageId messageId;
+ producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), messageId);
+ if (i == 3) {
+ seekMessageId = messageId;
+ }
+ }
+
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
+
+ reader.seek(seekMessageId);
+
+ bool hasMessageAvailable;
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+
+ client.close();
+}