You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/18 11:36:03 UTC
[rocketmq-client-cpp] branch main updated: Allow configuring custom
offset-store
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a7184b2 Allow configuring custom offset-store
a7184b2 is described below
commit a7184b2a59d5c08227b329160a1f9525c2a6b546
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Oct 18 19:35:37 2021 +0800
Allow configuring custom offset-store
---
api/rocketmq/DefaultMQPushConsumer.h | 3 +++
src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp | 4 ++++
src/main/cpp/rocketmq/include/PushConsumer.h | 3 +++
src/main/cpp/rocketmq/include/PushConsumerImpl.h | 2 +-
src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h | 2 ++
5 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/api/rocketmq/DefaultMQPushConsumer.h b/api/rocketmq/DefaultMQPushConsumer.h
index 03ac7d1..0693b8d 100644
--- a/api/rocketmq/DefaultMQPushConsumer.h
+++ b/api/rocketmq/DefaultMQPushConsumer.h
@@ -28,6 +28,7 @@
#include "MessageListener.h"
#include "rocketmq/Executor.h"
#include "rocketmq/MessageModel.h"
+#include "rocketmq/OffsetStore.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -110,6 +111,8 @@ public:
std::string groupName() const;
+ void setOffsetStore(std::unique_ptr<OffsetStore> offset_store);
+
private:
std::shared_ptr<PushConsumerImpl> impl_;
};
diff --git a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
index 7a50dac..890221c 100644
--- a/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
+++ b/src/main/cpp/rocketmq/DefaultMQPushConsumer.cpp
@@ -127,4 +127,8 @@ std::string DefaultMQPushConsumer::groupName() const {
return impl_->getGroupName();
}
+void DefaultMQPushConsumer::setOffsetStore(std::unique_ptr<OffsetStore> offset_store) {
+ impl_->setOffsetStore(std::move(offset_store));
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/PushConsumer.h b/src/main/cpp/rocketmq/include/PushConsumer.h
index fee3136..cf2da65 100644
--- a/src/main/cpp/rocketmq/include/PushConsumer.h
+++ b/src/main/cpp/rocketmq/include/PushConsumer.h
@@ -25,6 +25,7 @@
#include "rocketmq/Executor.h"
#include "rocketmq/MessageListener.h"
#include "rocketmq/MessageModel.h"
+#include "rocketmq/OffsetStore.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -53,6 +54,8 @@ public:
virtual bool receiveMessage(const MQMessageQueue& message_queue, const FilterExpression& filter_expression) = 0;
virtual MessageListener* messageListener() = 0;
+
+ virtual void setOffsetStore(std::unique_ptr<OffsetStore> offset_store) = 0;
};
using PushConsumerSharedPtr = std::shared_ptr<PushConsumer>;
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 238e377..399b87e 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -158,7 +158,7 @@ public:
message_model_ = message_model;
}
- void offsetStore(std::unique_ptr<OffsetStore> offset_store) {
+ void setOffsetStore(std::unique_ptr<OffsetStore> offset_store) override {
offset_store_ = std::move(offset_store);
}
diff --git a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
index 63275ab..e4508ba 100644
--- a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
+++ b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
@@ -46,6 +46,8 @@ public:
MOCK_METHOD(bool, receiveMessage, (const MQMessageQueue&, const FilterExpression&), (override));
MOCK_METHOD(MessageListener*, messageListener, (), (override));
+
+ MOCK_METHOD(void, setOffsetStore, (std::unique_ptr<OffsetStore>), (override));
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file