You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 09:04:19 UTC
[rocketmq-client-cpp] branch main updated: Try to submit
consume-task for the current process-queue its own task completed
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2b03186 Try to submit consume-task for the current process-queue its own task completed
2b03186 is described below
commit 2b0318696b4f0791fe4c428612c6a98126a38c7f
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Oct 25 17:04:08 2021 +0800
Try to submit consume-task for the current process-queue its own task completed
---
src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp | 2 +-
src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h | 5 +++++
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
index ebefba6..58ee2ec 100644
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
@@ -262,7 +262,7 @@ void ConsumeFifoMessageService::onAck(const ProcessQueueWeakPtr& process_queue,
SPDLOG_DEBUG("Acknowledge FIFO message[MessageQueue={}, MsgId={}] OK", process_queue_ptr->simpleName(),
message.getMsgId());
process_queue_ptr->unbindFifoConsumeTask();
- signalDispatcher();
+ submitConsumeTask(process_queue);
}
}
diff --git a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
index 8f42cde..a3712d8 100644
--- a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
@@ -26,6 +26,11 @@ public:
void shutdown() override;
+ /**
+ * @brief Entry of ConsumeMessageService
+ *
+ * @param process_queue
+ */
void submitConsumeTask(const ProcessQueueWeakPtr& process_queue) override;
MessageListenerType messageListenerType() override;