You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/04 10:02:39 UTC

[GitHub] [pulsar] RobertIndie opened a new pull request, #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

RobertIndie opened a new pull request, #16943:
URL: https://github.com/apache/pulsar/pull/16943

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   Fixes #16761
   
   ### Motivation
   
   When C++ client calls `ConsumerImpl::getLastMessageIdAsync`, if the connection is not established, the callback will complete with `ResultNotConnected`.
   
   ### Modifications
   
   * Wait until the connection is established while getting the last message id.
   
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r950028292


##########
pulsar-client-cpp/tests/ConsumerTest.cc:
##########
@@ -763,4 +763,36 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
     thread.join();
 }
 
+TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
+    int operationTimeout = 10;

Review Comment:
   10 seconds might be too long for a single test, maybe you can reduce it to 5 seconds (just a suggestion).



##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1326,8 +1336,26 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             callback(ResultUnsupportedVersionError, MessageId());
         }
     } else {
-        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-        callback(ResultNotConnected, MessageId());
+        TimeDuration next = std::min(remainTime, backoff->next());
+        if (next.total_milliseconds() <= 0) {
+            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+            callback(ResultNotConnected, MessageId());
+            return;
+        }
+        remainTime -= next;
+
+        timer->expires_from_now(next);
+
+        timer->async_wait([this, backoff, remainTime, timer, next,
+                           callback](const boost::system::error_code& ec) -> void {
+            if (ec) {
+                LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "].");
+                return;
+            }

Review Comment:
   `if (ec)` (in short for `if (ec != 0)`) doesn't always mean the operation is cancelled. See how we process the error code in https://github.com/apache/pulsar/blob/3ecf34abf99afd7c53a681ab82d5bcec2bc499a1/pulsar-client-cpp/lib/ClientConnection.cc#L594-L598



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r951112751


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1326,8 +1336,26 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             callback(ResultUnsupportedVersionError, MessageId());
         }
     } else {
-        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-        callback(ResultNotConnected, MessageId());
+        TimeDuration next = std::min(remainTime, backoff->next());
+        if (next.total_milliseconds() <= 0) {
+            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+            callback(ResultNotConnected, MessageId());
+            return;
+        }
+        remainTime -= next;
+
+        timer->expires_from_now(next);
+
+        timer->async_wait([this, backoff, remainTime, timer, next,
+                           callback](const boost::system::error_code& ec) -> void {

Review Comment:
   Looks like there is a similar problem in the current codes. https://github.com/apache/pulsar/blob/c4bd7ae15a1e37075fb5a0b3b4636591ed2462c8/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc#L153-L158
   
   Could you guide me on how to capture `shared_from_this()`? I haven't found a good way to solve it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r951237316


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1326,8 +1336,26 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             callback(ResultUnsupportedVersionError, MessageId());
         }
     } else {
-        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-        callback(ResultNotConnected, MessageId());
+        TimeDuration next = std::min(remainTime, backoff->next());
+        if (next.total_milliseconds() <= 0) {
+            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+            callback(ResultNotConnected, MessageId());
+            return;
+        }
+        remainTime -= next;
+
+        timer->expires_from_now(next);
+
+        timer->async_wait([this, backoff, remainTime, timer, next,
+                           callback](const boost::system::error_code& ec) -> void {

Review Comment:
   Oh. I get it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r951127633


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1326,8 +1336,26 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             callback(ResultUnsupportedVersionError, MessageId());
         }
     } else {
-        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-        callback(ResultNotConnected, MessageId());
+        TimeDuration next = std::min(remainTime, backoff->next());
+        if (next.total_milliseconds() <= 0) {
+            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+            callback(ResultNotConnected, MessageId());
+            return;
+        }
+        remainTime -= next;
+
+        timer->expires_from_now(next);
+
+        timer->async_wait([this, backoff, remainTime, timer, next,
+                           callback](const boost::system::error_code& ec) -> void {

Review Comment:
   Your reference is an example. `self` is what `shared_from_this()` returns, this lambda captures `self` by value so that the reference count will increase by one. After the lambda ends, the reference count will decrease by one, so the object keeps valid (refcnt is always > 0) during the lifetime of the lambda.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie merged pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
RobertIndie merged PR #16943:
URL: https://github.com/apache/pulsar/pull/16943


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r937885284


##########
pulsar-client-cpp/tests/ReaderTest.cc:
##########
@@ -579,3 +579,30 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
     EXPECT_FALSE(hasMessageAvailable);
     client.close();
 }
+
+TEST(ReaderTest, testReceiveAfterSeek) {
+    Client client(serviceUrl);
+    const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    MessageId seekMessageId;
+    for (int i = 0; i < 5; i++) {
+        MessageId messageId;
+        producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), messageId);
+        if (i == 3) {
+            seekMessageId = messageId;
+        }
+    }
+
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
+
+    reader.seek(seekMessageId);
+
+    bool hasMessageAvailable;
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));

Review Comment:
   When a connection is available, this assertion always passes,  right? 
   
   Does this unit test have a mock connection unavailable?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r939806303


##########
pulsar-client-cpp/tests/ReaderTest.cc:
##########
@@ -579,3 +579,30 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
     EXPECT_FALSE(hasMessageAvailable);
     client.close();
 }
+
+TEST(ReaderTest, testReceiveAfterSeek) {
+    Client client(serviceUrl);
+    const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    MessageId seekMessageId;
+    for (int i = 0; i < 5; i++) {
+        MessageId messageId;
+        producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), messageId);
+        if (i == 3) {
+            seekMessageId = messageId;
+        }
+    }
+
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
+
+    reader.seek(seekMessageId);
+
+    bool hasMessageAvailable;
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));

Review Comment:
   Not really. The connection will be reconnected after the seek operation.
   
   I have added a test `testGetLastMessageIdBlockWhenConnectionDisconnected` to test it more rigorous. PTAL. Thanks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16943: [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16943:
URL: https://github.com/apache/pulsar/pull/16943#discussion_r950002769


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1326,8 +1336,26 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             callback(ResultUnsupportedVersionError, MessageId());
         }
     } else {
-        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-        callback(ResultNotConnected, MessageId());
+        TimeDuration next = std::min(remainTime, backoff->next());
+        if (next.total_milliseconds() <= 0) {
+            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+            callback(ResultNotConnected, MessageId());
+            return;
+        }
+        remainTime -= next;
+
+        timer->expires_from_now(next);
+
+        timer->async_wait([this, backoff, remainTime, timer, next,
+                           callback](const boost::system::error_code& ec) -> void {

Review Comment:
   It's better to capture `shared_from_this()` here because when the callback is executed, the reference count of `std::shared_ptr<ConsumerImpl>` might be zero so that `this` points to an invalid address. Capturing the `shared_from_this()` can extend the lifetime of the object to ensure `this` always points to a valid address.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org