You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/25 10:25:39 UTC

[GitHub] [pulsar-client-cpp] RobertIndie opened a new pull request, #71: [feat] Support expiration for chunked messages

RobertIndie opened a new pull request, #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   
   
   ### Motivation
   
   Add support for checking expiration for incomplete chunked messages.
   
   
   ### Modifications
   
   * Add configuration `expireTimeOfIncompleteChunkedMessageMs` to the consumer.
   * Add timer to check the expiration incomplete chunked messages
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013617630


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   https://godbolt.org/z/eezdxMGnc @Demogorgon314 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007627774


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {

Review Comment:
   I think it's okay for the current approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1012822911


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,48 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                bool expired =
+                    currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;
+                if (!expired) {
+                    return false;
+                }
+                for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
+                    if (autoAckOldestChunkedMessageOnQueueFull_) {

Review Comment:
   Oh Thanks. Make sense. If the user set `expireTimeOfIncompleteChunkedMessageMs_ ` >0, then it means the user would not care about the message loss. So we can just ack them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007545915


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void {
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   Are there any concerns here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007627774


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {

Review Comment:
   I think it's okay for the current approach. The weak_ptr is already captured in the callback. I think we don't need to `dynamic_pointer_cast` the shared_ptr here. This approach has the same effect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] Demogorgon314 commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1012583780


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,48 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                bool expired =
+                    currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;
+                if (!expired) {
+                    return false;
+                }
+                for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
+                    if (autoAckOldestChunkedMessageOnQueueFull_) {

Review Comment:
   I just checked the Java client. It always auto-ack when the message expires. Do we need to follow the Java client behavior?
   
   Also, this config is a little confusing here. It should be auto ack expires chunked message, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007627774


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {

Review Comment:
   I think it's okay for the current approach. The weak_ptr is already captured in the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007625318


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void {
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   Are there any concerns here? I still don't understand what the benefit of this approach. I think may be the effect is the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] Demogorgon314 commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1012465057


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                return currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;

Review Comment:
   Do we need to do ACK for the removed messageIds?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013613834


##########
lib/ConsumerImpl.cc:
##########
@@ -331,6 +371,13 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                                                  << payload.readableBytes() << " bytes");
 
     Lock lock(chunkProcessMutex_);
+
+    // Lazy task scheduling to expire incomplete chunk message
+    if (!checkExpiredChunkedTimer_) {
+        checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+        triggerCheckExpiredChunkedTimer();
+    }

Review Comment:
   We should add a boolean variable `expireChunkMessageTaskScheduled` like Java client does. You can also use something like `Optional` but since `deadline_timer` is not copyable you have to use the move constructor.
   
   BTW, I am going to remove unnecessary `std::shared_ptr` usages in C++ client. It should not be used everywhere. When writing C++, the best practice is to make the lifetime clear like writing Rust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007814577


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {

Review Comment:
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013550364


##########
lib/ConsumerImpl.cc:
##########
@@ -331,6 +371,13 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                                                  << payload.readableBytes() << " bytes");
 
     Lock lock(chunkProcessMutex_);
+
+    // Lazy task scheduling to expire incomplete chunk message
+    if (!checkExpiredChunkedTimer_) {
+        checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+        triggerCheckExpiredChunkedTimer();
+    }

Review Comment:
   Just create the timer in the constructor. `createDeadlineTimer` won't start any timer, it only creates the object. Otherwise, we should always check it's not a nullptr when accessing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013550931


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                bool expired =
+                    currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;
+                if (!expired) {
+                    return false;
+                }
+                for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
+                    LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId);
+                    doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) {

Review Comment:
   Sorry I see it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1014963288


##########
lib/ConsumerImpl.h:
##########
@@ -295,6 +300,12 @@ class ConsumerImpl : public ConsumerImplBase {
     MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
     mutable std::mutex chunkProcessMutex_;
 
+    const long expireTimeOfIncompleteChunkedMessageMs_;
+    DeadlineTimerPtr checkExpiredChunkedTimer_;
+    std::atomic_bool expireChunkMessageTaskScheduled_ = {false};

Review Comment:
   You can simply use the brace initialization for those have no copy constructors.
   
   ```suggestion
       std::atomic_bool expireChunkMessageTaskScheduled_{false};
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie merged pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie merged PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1014651411


##########
lib/ConsumerImpl.h:
##########
@@ -302,6 +302,7 @@ class ConsumerImpl : public ConsumerImplBase {
 
     const long expireTimeOfIncompleteChunkedMessageMs_;
     DeadlineTimerPtr checkExpiredChunkedTimer_;
+    bool expireChunkMessageTaskScheduled_ = false;

Review Comment:
   You should use an `atomic_bool` here and use CAS to set it true.
   
   I see Java client doesn't use `AtomicBoolean`, but `processMessageChunk` could be called in different threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013547938


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                bool expired =
+                    currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;
+                if (!expired) {
+                    return false;
+                }
+                for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
+                    LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId);
+                    doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) {

Review Comment:
   Why did you capture `uuid` and `msgId` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013557763


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1012573009


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                return currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;

Review Comment:
   Good point! The user should be able to choose the behavior(ack or just discard it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1006850483


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();

Review Comment:
   If `checkExpiredChunkedTimer_`  already existed, not need to recreate it.



##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void {
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   Just capture `expireTimeOfIncompleteChunkedMessageMs_ ` on here.



##########
lib/ConsumerImpl.h:
##########
@@ -33,8 +33,11 @@
 #include "NegativeAcksTracker.h"
 #include "Synchronized.h"
 #include "TestUtil.h"
+#include "TimeUtils.h"
 #include "UnboundedBlockingQueue.h"
 
+using namespace pulsar;

Review Comment:
   There is no need to declare using pulsar namespace here because the class is already in this namespace



##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void {

Review Comment:
   Need to capture `smart pointers` to prevent segmentation faults. Weak pointers are recommended to prevent circular references.
   https://github.com/apache/pulsar/pull/17481



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013556982


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   It's better capture `currentTimeMs` by value. Though the lambda is called before `removeOldestValuesIf` finished, capturing a reference (the 8 bytes underlying pointer) is not better than capturing a long (8 bytes).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013616579


##########
lib/ConsumerImpl.cc:
##########
@@ -331,6 +371,13 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                                                  << payload.readableBytes() << " bytes");
 
     Lock lock(chunkProcessMutex_);
+
+    // Lazy task scheduling to expire incomplete chunk message
+    if (!checkExpiredChunkedTimer_) {
+        checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+        triggerCheckExpiredChunkedTimer();
+    }

Review Comment:
   https://godbolt.org/z/eezdxMGnc @Demogorgon314 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007627774


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {

Review Comment:
   I think it's okay for the current approach. The weak_ptr is already captured in the callback. I think we don't need to `dynamic_pointer_cast` the shared_ptr here. This approach has the same effect.
   
   You can refer here: https://github.com/apache/pulsar-client-cpp/blob/2248adcfdf23682221e36bf784287ce325589f2f/lib/RetryableLookupService.h#L131-L132



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1014959296


##########
lib/ConsumerImpl.h:
##########
@@ -302,6 +302,7 @@ class ConsumerImpl : public ConsumerImplBase {
 
     const long expireTimeOfIncompleteChunkedMessageMs_;
     DeadlineTimerPtr checkExpiredChunkedTimer_;
+    bool expireChunkMessageTaskScheduled_ = false;

Review Comment:
   Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] Demogorgon314 commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1012473479


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
+                return currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;

Review Comment:
   I think we should do ACK here. Otherwise, when the client restart, the consumer will receive the removed message again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013706319


##########
lib/ConsumerImpl.cc:
##########
@@ -331,6 +371,13 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                                                  << payload.readableBytes() << " bytes");
 
     Lock lock(chunkProcessMutex_);
+
+    // Lazy task scheduling to expire incomplete chunk message
+    if (!checkExpiredChunkedTimer_) {
+        checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+        triggerCheckExpiredChunkedTimer();
+    }

Review Comment:
   You're right. Agree to make the lifetime clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] Demogorgon314 commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013559666


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   Question: if we use x86 architectures 32-bit, it will use a 32-bit(4 bytes) pointer here, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] RobertIndie commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013556273


##########
lib/ConsumerImpl.cc:
##########
@@ -331,6 +371,13 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                                                  << payload.readableBytes() << " bytes");
 
     Lock lock(chunkProcessMutex_);
+
+    // Lazy task scheduling to expire incomplete chunk message
+    if (!checkExpiredChunkedTimer_) {
+        checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+        triggerCheckExpiredChunkedTimer();
+    }

Review Comment:
   The `checkExpiredChunkedTimer_ ` should always be not null in `triggerCheckExpiredChunkedTimer `; outside this method, we have always checked the null for it.
   If we go this way, we should always get the state of checkExpiredChunkedTimer_ here to check if the timer has been started to implement lazy scheduling. I haven't found a more elegant way to get the state. I think the current approach should be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1013608056


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   IIRC, long is also 4 bytes in 32-bit arch. BTW, 4 bytes vs 8 bytes is not so much different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007591353


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,31 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {

Review Comment:
   Here still caught `this`, You can refer:
   
   ``` c++
       std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
       checkExpiredChunkedTimer_->async_wait([weakSelf](const boost::system::error_code& ec) -> void {
           auto self = weakSelf.lock();
           if (!self || ec) {
               return;
           }
           Lock lock(self->chunkProcessMutex_);
           long currentTimeMs = TimeUtils::currentTimeMillis();
           self->chunkedMessageCache_.removeOldestValuesIf(
               [self, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
                   return currentTimeMs > ctx.getReceivedTimeMs() + self->expireTimeOfIncompleteChunkedMessageMs_;
               });
           self->triggerCheckExpiredChunkedTimer();
           return;
       });
   
   ```



##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void {
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   Oh, You still caught `this` above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #71: [feat] Support expiration for chunked messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #71:
URL: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007816394


##########
lib/ConsumerImpl.cc:
##########
@@ -319,6 +320,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
+    checkExpiredChunkedTimer_->expires_from_now(
+        boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void {
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, &currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {

Review Comment:
   Yes, no problem here. I mainly want to express that this is captured here: https://github.com/apache/pulsar-client-cpp/pull/71#discussion_r1007591353
   
   Now that you've explained it, you can resolve it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org