You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/01/28 14:03:17 UTC
[rocketmq-client-cpp] branch develop updated: WIP:
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/develop by this push:
new 16f0c1e WIP:
16f0c1e is described below
commit 16f0c1e7ab79c801abab534538ea36f84f4804d5
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Jan 28 21:57:29 2022 +0800
WIP:
---
src/main/cpp/client/SessionImpl.cpp | 89 +++++++++++++++++++++++++++++++
src/main/cpp/client/include/Session.h | 23 ++++++++
src/main/cpp/client/include/SessionImpl.h | 20 +++++++
3 files changed, 132 insertions(+)
diff --git a/src/main/cpp/client/SessionImpl.cpp b/src/main/cpp/client/SessionImpl.cpp
index da7e9d1..f047b76 100644
--- a/src/main/cpp/client/SessionImpl.cpp
+++ b/src/main/cpp/client/SessionImpl.cpp
@@ -89,6 +89,95 @@ void SessionImpl::ack(absl::flat_hash_map<std::string, std::string> metadata, co
stub_->async()->AckMessage(client_context, request, response, callback);
}
+void SessionImpl::heartbeat(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::HeartbeatRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) {
+ auto response = new rmq::HeartbeatResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->Heartbeat(client_context, request, response, callback);
+}
+
+void SessionImpl::healthCheck(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::HealthCheckRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) {
+ auto response = new rmq::HealthCheckResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->HealthCheck(client_context, request, response, callback);
+}
+
+void SessionImpl::endTransaction(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::EndTransactionRequest* request,
+ std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) {
+ auto response = new rmq::EndTransactionResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->EndTransaction(client_context, request, response, callback);
+}
+
+void SessionImpl::queryOffset(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryOffsetRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) {
+ auto response = new rmq::QueryOffsetResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->QueryOffset(client_context, request, response, callback);
+}
+
+void SessionImpl::pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) {
+ auto response = new rmq::PullMessageResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->PullMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::forwardMessageToDeadLetterQueue(
+ absl::flat_hash_map<std::string, std::string> metadata, const rmq::ForwardMessageToDeadLetterQueueRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) {
+ auto response = new rmq::ForwardMessageToDeadLetterQueueResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->ForwardMessageToDeadLetterQueue(client_context, request, response, callback);
+}
+
void SessionImpl::addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata,
grpc::ClientContext* client_context) {
for (const auto& entry : metadata) {
diff --git a/src/main/cpp/client/include/Session.h b/src/main/cpp/client/include/Session.h
index 4ad6de1..651f94d 100644
--- a/src/main/cpp/client/include/Session.h
+++ b/src/main/cpp/client/include/Session.h
@@ -56,6 +56,29 @@ public:
virtual void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) PURE;
+
+ virtual void heartbeat(absl::flat_hash_map<std::string, std::string> metadata, const rmq::HeartbeatRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) PURE;
+
+ virtual void healthCheck(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::HealthCheckRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) PURE;
+
+ virtual void endTransaction(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::EndTransactionRequest* request,
+ std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) PURE;
+
+ virtual void queryOffset(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryOffsetRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) PURE;
+
+ virtual void pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) PURE;
+
+ virtual void forwardMessageToDeadLetterQueue(
+ absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::ForwardMessageToDeadLetterQueueRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) PURE;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/SessionImpl.h b/src/main/cpp/client/include/SessionImpl.h
index f07f801..9e84aff 100644
--- a/src/main/cpp/client/include/SessionImpl.h
+++ b/src/main/cpp/client/include/SessionImpl.h
@@ -35,6 +35,26 @@ public:
void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) override;
+ void heartbeat(absl::flat_hash_map<std::string, std::string> metadata, const rmq::HeartbeatRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HeartbeatResponse&)> cb) override;
+
+ void healthCheck(absl::flat_hash_map<std::string, std::string> metadata, const rmq::HealthCheckRequest* request,
+ std::function<void(const grpc::Status&, const rmq::HealthCheckResponse&)> cb) override;
+
+ void endTransaction(absl::flat_hash_map<std::string, std::string> metadata, const rmq::EndTransactionRequest* request,
+ std::function<void(const grpc::Status&, const rmq::EndTransactionResponse&)> cb) override;
+
+ void queryOffset(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryOffsetRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryOffsetResponse&)> cb) override;
+
+ void pull(absl::flat_hash_map<std::string, std::string> metadata, const rmq::PullMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::PullMessageResponse&)> cb) override;
+
+ void forwardMessageToDeadLetterQueue(
+ absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::ForwardMessageToDeadLetterQueueRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ForwardMessageToDeadLetterQueueResponse&)> cb) override;
+
private:
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<rmq::MessagingService::Stub> stub_;