You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:04 UTC

[hbase] 16/133: HBASE-15687 Allow decoding more than GetResponse from the server

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit ed06d9f303a53fe6a157352589ef9dacfc77fd15
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Wed Apr 27 15:27:09 2016 -0700

    HBASE-15687 Allow decoding more than GetResponse from the server
    
    Summary:
    We'll need more than get's for the client to be usable. So now the
    Request class contains the protobufs needed to encode and decode
    rpc's.
    
    I also added some helper methods to create initial requests.
    
    Test Plan: It compiles and still gets data from HBase meta
    
    Differential Revision: https://reviews.facebook.net/D57327
---
 hbase-native-client/connection/BUCK                |  1 +
 .../connection/client-dispatcher.cc                |  6 +--
 hbase-native-client/connection/client-dispatcher.h |  6 +--
 hbase-native-client/connection/client-handler.cc   | 30 ++++++++++++---
 hbase-native-client/connection/client-handler.h    | 18 +++++++--
 .../connection/connection-factory.cc               |  6 +--
 .../connection/connection-factory.h                |  2 +-
 hbase-native-client/connection/pipeline.h          |  3 +-
 hbase-native-client/connection/request.cc          | 45 ++++++++++++++++++++++
 hbase-native-client/connection/request.h           | 25 ++++++++++--
 hbase-native-client/core/simple-client.cc          | 13 +++----
 11 files changed, 123 insertions(+), 32 deletions(-)

diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index 5067708..d393885 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -32,6 +32,7 @@ cxx_library(name="connection",
                 "client-handler.cc",
                 "connection-factory.cc",
                 "pipeline.cc",
+                "request.cc",
             ],
             deps=[
                 "//if:if",
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 25cff7d..eea0a17 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -35,16 +35,16 @@ void ClientDispatcher::read(Context *ctx, Response in) {
   p.setValue(in);
 }
 
-Future<Response> ClientDispatcher::operator()(Request arg) {
+Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
   auto call_id = ++current_call_id_;
 
-  arg.set_call_id(call_id);
+  arg->set_call_id(call_id);
   auto &p = requests_[call_id];
   auto f = p.getFuture();
   p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
     this->requests_.erase(call_id);
   });
-  this->pipeline_->write(arg);
+  this->pipeline_->write(std::move(arg));
 
   return f;
 }
diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h
index 89c7119..877e877 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -27,11 +27,11 @@
 
 namespace hbase {
 class ClientDispatcher
-    : public wangle::ClientDispatcherBase<SerializePipeline, Request,
-                                          Response> {
+    : public wangle::ClientDispatcherBase<SerializePipeline,
+                                          std::unique_ptr<Request>, Response> {
 public:
   void read(Context *ctx, Response in) override;
-  folly::Future<Response> operator()(Request arg) override;
+  folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
   folly::Future<folly::Unit> close(Context *ctx) override;
   folly::Future<folly::Unit> close() override;
 
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 205993a7..abcf5c1 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -47,14 +47,30 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
     LOG(INFO) << "Read ResponseHeader size=" << used_bytes
               << " call_id=" << header.call_id()
               << " has_exception=" << header.has_exception();
+
+    // Get the response protobuf from the map
+    auto search = resp_msgs_.find(header.call_id());
+    // It's an error if it's not there.
+    CHECK(search != resp_msgs_.end());
+    auto resp_msg = search->second;
+    CHECK(resp_msg != nullptr);
+
+    // Make sure we don't leak the protobuf
+    resp_msgs_.erase(search);
+
+    // set the call_id.
+    // This will be used to by the dispatcher to match up
+    // the promise with the response.
     received.set_call_id(header.call_id());
 
+    // If there was an exception then there's no
+    // data left on the wire.
     if (header.has_exception() == false) {
       buf->trimStart(used_bytes);
-      // For now assume that everything was a get.
-      // We'll need to set this up later.
-      received.set_response(std::make_shared<GetResponse>());
-      used_bytes = deser_.parse_delimited(buf.get(), received.response().get());
+      used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get());
+      // Make sure that bytes were parsed.
+      CHECK(used_bytes == buf->length());
+      received.set_response(resp_msg);
     }
     ctx->fireRead(std::move(received));
   }
@@ -62,7 +78,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
 
 // TODO(eclark): Figure out how to handle the
 // network errors that are going to come.
-Future<Unit> ClientHandler::write(Context *ctx, Request r) {
+Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
   // Keep track of if we have sent the header.
   if (UNLIKELY(need_send_header_)) {
     need_send_header_ = false;
@@ -78,5 +94,7 @@ Future<Unit> ClientHandler::write(Context *ctx, Request r) {
     ctx->fireWrite(std::move(pre));
   }
 
-  return ctx->fireWrite(ser_.request(r.call_id(), r.method(), r.msg()));
+  resp_msgs_[r->call_id()] = r->resp_msg();
+  return ctx->fireWrite(
+      ser_.request(r->call_id(), r->method(), r->req_msg().get()));
 }
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index dbaf5a0..41bb883 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -30,20 +30,30 @@ namespace hbase {
 class Request;
 class Response;
 }
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
 
 namespace hbase {
-class ClientHandler
-    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
-                             std::unique_ptr<folly::IOBuf>> {
+class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>,
+                                             Response, std::unique_ptr<Request>,
+                                             std::unique_ptr<folly::IOBuf>> {
 public:
   ClientHandler(std::string user_name);
   void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
-  folly::Future<folly::Unit> write(Context *ctx, Request r) override;
+  folly::Future<folly::Unit> write(Context *ctx,
+                                   std::unique_ptr<Request> r) override;
 
 private:
   bool need_send_header_ = true;
   std::string user_name_;
   ClientSerializer ser_;
   ClientDeserializer deser_;
+
+  // in flight requests
+  std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>>
+      resp_msgs_;
 };
 } // namespace hbase
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 5d1b0da..7073f9d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -45,14 +45,14 @@ ConnectionFactory::ConnectionFactory() {
   bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
 }
 
-std::shared_ptr<Service<Request, Response>>
+std::shared_ptr<Service<std::unique_ptr<Request>, Response>>
 ConnectionFactory::make_connection(std::string host, int port) {
   // Connect to a given server
   // Then when connected create a ClientDispactcher.
   auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
   auto dispatcher = std::make_shared<ClientDispatcher>();
   dispatcher->setPipeline(pipeline);
-  auto service =
-      std::make_shared<CloseOnReleaseFilter<Request, Response>>(dispatcher);
+  auto service = std::make_shared<
+      CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher);
   return service;
 }
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 73ac032..8d1d2f0 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -31,7 +31,7 @@ namespace hbase {
 class ConnectionFactory {
 public:
   ConnectionFactory();
-  std::shared_ptr<wangle::Service<Request, Response>>
+  std::shared_ptr<wangle::Service<std::unique_ptr<Request>, Response>>
   make_connection(std::string host, int port);
 
 private:
diff --git a/hbase-native-client/connection/pipeline.h b/hbase-native-client/connection/pipeline.h
index 8114fab..6c4f4ff 100644
--- a/hbase-native-client/connection/pipeline.h
+++ b/hbase-native-client/connection/pipeline.h
@@ -26,7 +26,8 @@
 #include "utils/user-util.h"
 
 namespace hbase {
-using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>;
+using SerializePipeline =
+    wangle::Pipeline<folly::IOBufQueue &, std::unique_ptr<Request>>;
 
 class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
 public:
diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc
new file mode 100644
index 0000000..50ea029
--- /dev/null
+++ b/hbase-native-client/connection/request.cc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/request.h"
+
+#include "if/Client.pb.h"
+
+using namespace hbase;
+
+Request::Request(std::shared_ptr<google::protobuf::Message> req,
+                 std::shared_ptr<google::protobuf::Message> resp,
+                 std::string method)
+    : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {}
+
+std::unique_ptr<Request> Request::get() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(),
+                                  std::make_shared<hbase::pb::GetResponse>(),
+                                  "Get");
+}
+std::unique_ptr<Request> Request::mutate() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(),
+                                  std::make_shared<hbase::pb::MutateResponse>(),
+                                  "Mutate");
+}
+std::unique_ptr<Request> Request::scan() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(),
+                                  std::make_shared<hbase::pb::ScanResponse>(),
+                                  "Scan");
+}
diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h
index e9e3e88..743c469 100644
--- a/hbase-native-client/connection/request.h
+++ b/hbase-native-client/connection/request.h
@@ -21,22 +21,39 @@
 #include <google/protobuf/message.h>
 
 #include <cstdint>
+#include <memory>
 #include <string>
 
 namespace hbase {
 class Request {
 public:
-  Request() : call_id_(0) {}
+  static std::unique_ptr<Request> get();
+  static std::unique_ptr<Request> mutate();
+  static std::unique_ptr<Request> scan();
+
+  Request(std::shared_ptr<google::protobuf::Message> req,
+          std::shared_ptr<google::protobuf::Message> resp, std::string method);
+
   uint32_t call_id() { return call_id_; }
   void set_call_id(uint32_t call_id) { call_id_ = call_id; }
-  google::protobuf::Message *msg() { return msg_.get(); }
-  void set_msg(std::shared_ptr<google::protobuf::Message> msg) { msg_ = msg; }
+
+  std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; }
+  std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; }
+
+  void set_req_msg(std::shared_ptr<google::protobuf::Message> msg) {
+    req_msg_ = msg;
+  }
+  void set_resp_msg(std::shared_ptr<google::protobuf::Message> msg) {
+    resp_msg_ = msg;
+  }
+
   std::string method() { return method_; }
   void set_method(std::string method) { method_ = method; }
 
 private:
   uint32_t call_id_;
-  std::shared_ptr<google::protobuf::Message> msg_ = nullptr;
+  std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr;
+  std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr;
   std::string method_ = "Get";
 };
 } // namespace hbase
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 8b2fae5..2cb6200 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -58,24 +58,23 @@ int main(int argc, char *argv[]) {
   auto conn = cf.make_connection(result.host_name(), result.port());
 
   // Send the request
-  Request r;
+  auto r = Request::get();
 
   // This is a get request so make that
-  auto msg = make_shared<hbase::pb::GetRequest>();
+  auto req_msg = static_pointer_cast<hbase::pb::GetRequest>(r->req_msg());
 
   // Set what region
-  msg->mutable_region()->set_value(FLAGS_region);
+  req_msg->mutable_region()->set_value(FLAGS_region);
   // It's always this.
-  msg->mutable_region()->set_type(
+  req_msg->mutable_region()->set_type(
       RegionSpecifier_RegionSpecifierType::
           RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
 
   // What row.
-  msg->mutable_get()->set_row(FLAGS_row);
+  req_msg->mutable_get()->set_row(FLAGS_row);
 
   // Send it.
-  r.set_msg(msg);
-  auto resp = (*conn)(r).get(milliseconds(5000));
+  auto resp = (*conn)(std::move(r)).get(milliseconds(5000));
 
   auto get_resp = std::static_pointer_cast<GetResponse>(resp.response());
   cout << "GetResponse has_result = " << get_resp->has_result() << '\n';