You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/01/28 13:42:18 UTC
[rocketmq-client-cpp] branch develop updated: WIP: Implement more API using async gRPC API
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/develop by this push:
new 87b1ee2 WIP: Implement more API using async gRPC API
87b1ee2 is described below
commit 87b1ee26a83770ea2adb8a2f75f2d54a3ce71c1f
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Jan 28 21:41:30 2022 +0800
WIP: Implement more API using async gRPC API
---
src/main/cpp/client/SessionImpl.cpp | 79 +++++++++++++++++++++++++++++--
src/main/cpp/client/include/Session.h | 16 ++++++-
src/main/cpp/client/include/SessionImpl.h | 21 ++++++++
3 files changed, 112 insertions(+), 4 deletions(-)
diff --git a/src/main/cpp/client/SessionImpl.cpp b/src/main/cpp/client/SessionImpl.cpp
index 470c90d..da7e9d1 100644
--- a/src/main/cpp/client/SessionImpl.cpp
+++ b/src/main/cpp/client/SessionImpl.cpp
@@ -1,5 +1,7 @@
#include "SessionImpl.h"
+#include <chrono>
+#include <grpcpp/client_context.h>
#include <string>
#include "absl/memory/memory.h"
@@ -14,9 +16,8 @@ void SessionImpl::queryRoute(absl::flat_hash_map<std::string, std::string> metad
auto response = new rmq::QueryRouteResponse;
auto client_context = new grpc::ClientContext;
- for (const auto& entry : metadata) {
- client_context->AddMetadata(entry.first, entry.second);
- }
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
auto callback = [=](grpc::Status s) {
auto reply = absl::WrapUnique(response);
@@ -27,4 +28,76 @@ void SessionImpl::queryRoute(absl::flat_hash_map<std::string, std::string> metad
stub_->async()->QueryRoute(client_context, request, response, callback);
}
+void SessionImpl::send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) {
+ auto response = new rmq::SendMessageResponse;
+ auto client_context = new grpc::ClientContext;
+
+ setDeadline(io_timeout_, client_context);
+ addMetadata(metadata, client_context);
+
+ auto callback = [=](grpc::Status s) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(s, *reply);
+ };
+
+ stub_->async()->SendMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryAssignmentRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) {
+ auto response = new rmq::QueryAssignmentResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->QueryAssignment(client_context, request, response, callback);
+}
+
+void SessionImpl::receive(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::ReceiveMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) {
+ auto response = new rmq::ReceiveMessageResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->ReceiveMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) {
+ auto response = new rmq::AckMessageResponse;
+ auto client_context = new grpc::ClientContext;
+ addMetadata(metadata, client_context);
+ setDeadline(io_timeout_, client_context);
+ auto callback = [=](grpc::Status status) {
+ auto reply = absl::WrapUnique(response);
+ auto ctx = absl::WrapUnique(client_context);
+ cb(status, *reply);
+ };
+ stub_->async()->AckMessage(client_context, request, response, callback);
+}
+
+void SessionImpl::addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata,
+ grpc::ClientContext* client_context) {
+ for (const auto& entry : metadata) {
+ client_context->AddMetadata(entry.first, entry.second);
+ }
+}
+
+void SessionImpl::setDeadline(std::chrono::milliseconds timeout, grpc::ClientContext* client_context) {
+ client_context->set_deadline(std::chrono::system_clock::now() + io_timeout_);
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/Session.h b/src/main/cpp/client/include/Session.h
index 46780ea..4ad6de1 100644
--- a/src/main/cpp/client/include/Session.h
+++ b/src/main/cpp/client/include/Session.h
@@ -41,7 +41,21 @@ public:
virtual ~Session() = default;
virtual void queryRoute(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryRouteRequest* request,
- std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> callback) PURE;
+ std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> cb) PURE;
+
+ virtual void send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) PURE;
+
+ virtual void queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryAssignmentRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) PURE;
+
+ virtual void receive(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::ReceiveMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) PURE;
+
+ virtual void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) PURE;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/SessionImpl.h b/src/main/cpp/client/include/SessionImpl.h
index 4780178..f07f801 100644
--- a/src/main/cpp/client/include/SessionImpl.h
+++ b/src/main/cpp/client/include/SessionImpl.h
@@ -2,6 +2,10 @@
#include "Session.h"
#include <apache/rocketmq/v1/definition.pb.h>
+#include <apache/rocketmq/v1/service.pb.h>
+#include <chrono>
+#include <functional>
+#include <grpcpp/client_context.h>
#include <memory>
#include <string>
@@ -18,9 +22,26 @@ public:
void queryRoute(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryRouteRequest* request,
std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> callback) override;
+ void send(absl::flat_hash_map<std::string, std::string> metadata, const rmq::SendMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::SendMessageResponse&)> cb) override;
+
+ void queryAssignment(absl::flat_hash_map<std::string, std::string> metadata,
+ const rmq::QueryAssignmentRequest* request,
+ std::function<void(const grpc::Status&, const rmq::QueryAssignmentResponse&)> cb) override;
+
+ void receive(absl::flat_hash_map<std::string, std::string> metadata, const rmq::ReceiveMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::ReceiveMessageResponse&)> cb) override;
+
+ void ack(absl::flat_hash_map<std::string, std::string> metadata, const rmq::AckMessageRequest* request,
+ std::function<void(const grpc::Status&, const rmq::AckMessageResponse&)> cb) override;
+
private:
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<rmq::MessagingService::Stub> stub_;
+ std::chrono::seconds io_timeout_{3};
+
+ void addMetadata(const absl::flat_hash_map<std::string, std::string>& metadata, grpc::ClientContext* client_context);
+ void setDeadline(std::chrono::milliseconds timeout, grpc::ClientContext* client_context);
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file