You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/01/28 12:06:08 UTC

[rocketmq-client-cpp] branch develop updated: Implement queryRoute using async API

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/develop by this push:
     new e73dfad  Implement queryRoute using async API
e73dfad is described below

commit e73dfad9289d31c257d54e1e58f0e55cca04ce5c
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Jan 28 20:05:49 2022 +0800

    Implement queryRoute using async API
---
 api/rocketmq/RocketMQ.h                   |  4 ++-
 src/main/cpp/client/SessionImpl.cpp       | 30 ++++++++++++++++++++
 src/main/cpp/client/include/Session.h     | 47 +++++++++++++++++++++++++++++++
 src/main/cpp/client/include/SessionImpl.h | 26 +++++++++++++++++
 4 files changed, 106 insertions(+), 1 deletion(-)

diff --git a/api/rocketmq/RocketMQ.h b/api/rocketmq/RocketMQ.h
index bc8e10b..7b714e3 100644
--- a/api/rocketmq/RocketMQ.h
+++ b/api/rocketmq/RocketMQ.h
@@ -21,4 +21,6 @@
 #endif
 
 #define ROCKETMQ_NAMESPACE_BEGIN namespace ROCKETMQ_NAMESPACE {
-#define ROCKETMQ_NAMESPACE_END }
\ No newline at end of file
+#define ROCKETMQ_NAMESPACE_END }
+
+#define PURE = 0
\ No newline at end of file
diff --git a/src/main/cpp/client/SessionImpl.cpp b/src/main/cpp/client/SessionImpl.cpp
new file mode 100644
index 0000000..470c90d
--- /dev/null
+++ b/src/main/cpp/client/SessionImpl.cpp
@@ -0,0 +1,30 @@
+#include "SessionImpl.h"
+
+#include <string>
+
+#include "absl/memory/memory.h"
+#include "apache/rocketmq/v1/service.pb.h"
+#include "grpcpp/client_context.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void SessionImpl::queryRoute(absl::flat_hash_map<std::string, std::string> metadata,
+                             const rmq::QueryRouteRequest* request,
+                             std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> cb) {
+  auto response = new rmq::QueryRouteResponse;
+  auto client_context = new grpc::ClientContext;
+
+  for (const auto& entry : metadata) {
+    client_context->AddMetadata(entry.first, entry.second);
+  }
+
+  auto callback = [=](grpc::Status s) {
+    auto reply = absl::WrapUnique(response);
+    auto ctx = absl::WrapUnique(client_context);
+    cb(s, *response);
+  };
+
+  stub_->async()->QueryRoute(client_context, request, response, callback);
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/Session.h b/src/main/cpp/client/include/Session.h
new file mode 100644
index 0000000..46780ea
--- /dev/null
+++ b/src/main/cpp/client/include/Session.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <grpcpp/client_context.h>
+#include <grpcpp/impl/codegen/status.h>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/strings/string_view.h"
+#include "apache/rocketmq/v1/definition.grpc.pb.h"
+#include "apache/rocketmq/v1/service.grpc.pb.h"
+#include "apache/rocketmq/v1/service.pb.h"
+#include "grpcpp/grpcpp.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+namespace rmq = apache::rocketmq::v1;
+
+class Session {
+public:
+  virtual ~Session() = default;
+
+  virtual void queryRoute(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryRouteRequest* request,
+                          std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> callback) PURE;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/client/include/SessionImpl.h b/src/main/cpp/client/include/SessionImpl.h
new file mode 100644
index 0000000..4780178
--- /dev/null
+++ b/src/main/cpp/client/include/SessionImpl.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include "Session.h"
+#include <apache/rocketmq/v1/definition.pb.h>
+#include <memory>
+#include <string>
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class SessionImpl : public Session {
+public:
+  SessionImpl(std::shared_ptr<grpc::Channel> channel)
+      : channel_(std::move(channel)), stub_(rmq::MessagingService::NewStub(channel_)) {
+  }
+
+  ~SessionImpl() override = default;
+
+  void queryRoute(absl::flat_hash_map<std::string, std::string> metadata, const rmq::QueryRouteRequest* request,
+                  std::function<void(const grpc::Status&, const rmq::QueryRouteResponse&)> callback) override;
+
+private:
+  std::shared_ptr<grpc::Channel> channel_;
+  std::unique_ptr<rmq::MessagingService::Stub> stub_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file