You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/18 11:36:03 UTC

[rocketmq-client-cpp] branch main updated: Allow configuring custom offset-store

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a7184b2  Allow configuring custom offset-store
a7184b2 is described below

commit a7184b2a59d5c08227b329160a1f9525c2a6b546
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Oct 18 19:35:37 2021 +0800

    Allow configuring custom offset-store
---
 api/rocketmq/DefaultMQPushConsumer.h                   | 3 +++
 src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp        | 4 ++++
 src/main/cpp/rocketmq/include/PushConsumer.h           | 3 +++
 src/main/cpp/rocketmq/include/PushConsumerImpl.h       | 2 +-
 src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h | 2 ++
 5 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/api/rocketmq/DefaultMQPushConsumer.h b/api/rocketmq/DefaultMQPushConsumer.h
index 03ac7d1..0693b8d 100644
--- a/api/rocketmq/DefaultMQPushConsumer.h
+++ b/api/rocketmq/DefaultMQPushConsumer.h
@@ -28,6 +28,7 @@
 #include "MessageListener.h"
 #include "rocketmq/Executor.h"
 #include "rocketmq/MessageModel.h"
+#include "rocketmq/OffsetStore.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -110,6 +111,8 @@ public:
 
   std::string groupName() const;
 
+  void setOffsetStore(std::unique_ptr<OffsetStore> offset_store);
+
 private:
   std::shared_ptr<PushConsumerImpl> impl_;
 };
diff --git a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
index 7a50dac..890221c 100644
--- a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
+++ b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
@@ -127,4 +127,8 @@ std::string DefaultMQPushConsumer::groupName() const {
   return impl_->getGroupName();
 }
 
+void DefaultMQPushConsumer::setOffsetStore(std::unique_ptr<OffsetStore> offset_store) {
+  impl_->setOffsetStore(std::move(offset_store));
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/PushConsumer.h b/src/main/cpp/rocketmq/include/PushConsumer.h
index fee3136..cf2da65 100644
--- a/src/main/cpp/rocketmq/include/PushConsumer.h
+++ b/src/main/cpp/rocketmq/include/PushConsumer.h
@@ -25,6 +25,7 @@
 #include "rocketmq/Executor.h"
 #include "rocketmq/MessageListener.h"
 #include "rocketmq/MessageModel.h"
+#include "rocketmq/OffsetStore.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -53,6 +54,8 @@ public:
   virtual bool receiveMessage(const MQMessageQueue& message_queue, const FilterExpression& filter_expression) = 0;
 
   virtual MessageListener* messageListener() = 0;
+
+  virtual void setOffsetStore(std::unique_ptr<OffsetStore> offset_store) = 0;
 };
 
 using PushConsumerSharedPtr = std::shared_ptr<PushConsumer>;
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 238e377..399b87e 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -158,7 +158,7 @@ public:
     message_model_ = message_model;
   }
 
-  void offsetStore(std::unique_ptr<OffsetStore> offset_store) {
+  void setOffsetStore(std::unique_ptr<OffsetStore> offset_store) override {
     offset_store_ = std::move(offset_store);
   }
 
diff --git a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
index 63275ab..e4508ba 100644
--- a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
+++ b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
@@ -46,6 +46,8 @@ public:
   MOCK_METHOD(bool, receiveMessage, (const MQMessageQueue&, const FilterExpression&), (override));
 
   MOCK_METHOD(MessageListener*, messageListener, (), (override));
+
+  MOCK_METHOD(void, setOffsetStore, (std::unique_ptr<OffsetStore>), (override));
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file