You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:04 UTC
[hbase] 16/133: HBASE-15687 Allow decoding more than GetResponse
from the server
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ed06d9f303a53fe6a157352589ef9dacfc77fd15
Author: Elliott Clark <ec...@apache.org>
AuthorDate: Wed Apr 27 15:27:09 2016 -0700
HBASE-15687 Allow decoding more than GetResponse from the server
Summary:
We'll need more than get's for the client to be usable. So now the
Request class contains the protobufs needed to encode and decode
rpc's.
I also added some helper methods to create initial requests.
Test Plan: It compiles and still gets data from HBase meta
Differential Revision: https://reviews.facebook.net/D57327
---
hbase-native-client/connection/BUCK | 1 +
.../connection/client-dispatcher.cc | 6 +--
hbase-native-client/connection/client-dispatcher.h | 6 +--
hbase-native-client/connection/client-handler.cc | 30 ++++++++++++---
hbase-native-client/connection/client-handler.h | 18 +++++++--
.../connection/connection-factory.cc | 6 +--
.../connection/connection-factory.h | 2 +-
hbase-native-client/connection/pipeline.h | 3 +-
hbase-native-client/connection/request.cc | 45 ++++++++++++++++++++++
hbase-native-client/connection/request.h | 25 ++++++++++--
hbase-native-client/core/simple-client.cc | 13 +++----
11 files changed, 123 insertions(+), 32 deletions(-)
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index 5067708..d393885 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -32,6 +32,7 @@ cxx_library(name="connection",
"client-handler.cc",
"connection-factory.cc",
"pipeline.cc",
+ "request.cc",
],
deps=[
"//if:if",
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 25cff7d..eea0a17 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -35,16 +35,16 @@ void ClientDispatcher::read(Context *ctx, Response in) {
p.setValue(in);
}
-Future<Response> ClientDispatcher::operator()(Request arg) {
+Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
auto call_id = ++current_call_id_;
- arg.set_call_id(call_id);
+ arg->set_call_id(call_id);
auto &p = requests_[call_id];
auto f = p.getFuture();
p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
this->requests_.erase(call_id);
});
- this->pipeline_->write(arg);
+ this->pipeline_->write(std::move(arg));
return f;
}
diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h
index 89c7119..877e877 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -27,11 +27,11 @@
namespace hbase {
class ClientDispatcher
- : public wangle::ClientDispatcherBase<SerializePipeline, Request,
- Response> {
+ : public wangle::ClientDispatcherBase<SerializePipeline,
+ std::unique_ptr<Request>, Response> {
public:
void read(Context *ctx, Response in) override;
- folly::Future<Response> operator()(Request arg) override;
+ folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
folly::Future<folly::Unit> close(Context *ctx) override;
folly::Future<folly::Unit> close() override;
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 205993a7..abcf5c1 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -47,14 +47,30 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
LOG(INFO) << "Read ResponseHeader size=" << used_bytes
<< " call_id=" << header.call_id()
<< " has_exception=" << header.has_exception();
+
+ // Get the response protobuf from the map
+ auto search = resp_msgs_.find(header.call_id());
+ // It's an error if it's not there.
+ CHECK(search != resp_msgs_.end());
+ auto resp_msg = search->second;
+ CHECK(resp_msg != nullptr);
+
+ // Make sure we don't leak the protobuf
+ resp_msgs_.erase(search);
+
+ // set the call_id.
+ // This will be used to by the dispatcher to match up
+ // the promise with the response.
received.set_call_id(header.call_id());
+ // If there was an exception then there's no
+ // data left on the wire.
if (header.has_exception() == false) {
buf->trimStart(used_bytes);
- // For now assume that everything was a get.
- // We'll need to set this up later.
- received.set_response(std::make_shared<GetResponse>());
- used_bytes = deser_.parse_delimited(buf.get(), received.response().get());
+ used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get());
+ // Make sure that bytes were parsed.
+ CHECK(used_bytes == buf->length());
+ received.set_response(resp_msg);
}
ctx->fireRead(std::move(received));
}
@@ -62,7 +78,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
// TODO(eclark): Figure out how to handle the
// network errors that are going to come.
-Future<Unit> ClientHandler::write(Context *ctx, Request r) {
+Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
// Keep track of if we have sent the header.
if (UNLIKELY(need_send_header_)) {
need_send_header_ = false;
@@ -78,5 +94,7 @@ Future<Unit> ClientHandler::write(Context *ctx, Request r) {
ctx->fireWrite(std::move(pre));
}
- return ctx->fireWrite(ser_.request(r.call_id(), r.method(), r.msg()));
+ resp_msgs_[r->call_id()] = r->resp_msg();
+ return ctx->fireWrite(
+ ser_.request(r->call_id(), r->method(), r->req_msg().get()));
}
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index dbaf5a0..41bb883 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -30,20 +30,30 @@ namespace hbase {
class Request;
class Response;
}
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
namespace hbase {
-class ClientHandler
- : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
- std::unique_ptr<folly::IOBuf>> {
+class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>,
+ Response, std::unique_ptr<Request>,
+ std::unique_ptr<folly::IOBuf>> {
public:
ClientHandler(std::string user_name);
void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
- folly::Future<folly::Unit> write(Context *ctx, Request r) override;
+ folly::Future<folly::Unit> write(Context *ctx,
+ std::unique_ptr<Request> r) override;
private:
bool need_send_header_ = true;
std::string user_name_;
ClientSerializer ser_;
ClientDeserializer deser_;
+
+ // in flight requests
+ std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>>
+ resp_msgs_;
};
} // namespace hbase
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 5d1b0da..7073f9d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -45,14 +45,14 @@ ConnectionFactory::ConnectionFactory() {
bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
}
-std::shared_ptr<Service<Request, Response>>
+std::shared_ptr<Service<std::unique_ptr<Request>, Response>>
ConnectionFactory::make_connection(std::string host, int port) {
// Connect to a given server
// Then when connected create a ClientDispactcher.
auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
auto dispatcher = std::make_shared<ClientDispatcher>();
dispatcher->setPipeline(pipeline);
- auto service =
- std::make_shared<CloseOnReleaseFilter<Request, Response>>(dispatcher);
+ auto service = std::make_shared<
+ CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher);
return service;
}
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 73ac032..8d1d2f0 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -31,7 +31,7 @@ namespace hbase {
class ConnectionFactory {
public:
ConnectionFactory();
- std::shared_ptr<wangle::Service<Request, Response>>
+ std::shared_ptr<wangle::Service<std::unique_ptr<Request>, Response>>
make_connection(std::string host, int port);
private:
diff --git a/hbase-native-client/connection/pipeline.h b/hbase-native-client/connection/pipeline.h
index 8114fab..6c4f4ff 100644
--- a/hbase-native-client/connection/pipeline.h
+++ b/hbase-native-client/connection/pipeline.h
@@ -26,7 +26,8 @@
#include "utils/user-util.h"
namespace hbase {
-using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>;
+using SerializePipeline =
+ wangle::Pipeline<folly::IOBufQueue &, std::unique_ptr<Request>>;
class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
public:
diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc
new file mode 100644
index 0000000..50ea029
--- /dev/null
+++ b/hbase-native-client/connection/request.cc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/request.h"
+
+#include "if/Client.pb.h"
+
+using namespace hbase;
+
+Request::Request(std::shared_ptr<google::protobuf::Message> req,
+ std::shared_ptr<google::protobuf::Message> resp,
+ std::string method)
+ : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {}
+
+std::unique_ptr<Request> Request::get() {
+ return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(),
+ std::make_shared<hbase::pb::GetResponse>(),
+ "Get");
+}
+std::unique_ptr<Request> Request::mutate() {
+ return std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(),
+ std::make_shared<hbase::pb::MutateResponse>(),
+ "Mutate");
+}
+std::unique_ptr<Request> Request::scan() {
+ return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(),
+ std::make_shared<hbase::pb::ScanResponse>(),
+ "Scan");
+}
diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h
index e9e3e88..743c469 100644
--- a/hbase-native-client/connection/request.h
+++ b/hbase-native-client/connection/request.h
@@ -21,22 +21,39 @@
#include <google/protobuf/message.h>
#include <cstdint>
+#include <memory>
#include <string>
namespace hbase {
class Request {
public:
- Request() : call_id_(0) {}
+ static std::unique_ptr<Request> get();
+ static std::unique_ptr<Request> mutate();
+ static std::unique_ptr<Request> scan();
+
+ Request(std::shared_ptr<google::protobuf::Message> req,
+ std::shared_ptr<google::protobuf::Message> resp, std::string method);
+
uint32_t call_id() { return call_id_; }
void set_call_id(uint32_t call_id) { call_id_ = call_id; }
- google::protobuf::Message *msg() { return msg_.get(); }
- void set_msg(std::shared_ptr<google::protobuf::Message> msg) { msg_ = msg; }
+
+ std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; }
+ std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; }
+
+ void set_req_msg(std::shared_ptr<google::protobuf::Message> msg) {
+ req_msg_ = msg;
+ }
+ void set_resp_msg(std::shared_ptr<google::protobuf::Message> msg) {
+ resp_msg_ = msg;
+ }
+
std::string method() { return method_; }
void set_method(std::string method) { method_ = method; }
private:
uint32_t call_id_;
- std::shared_ptr<google::protobuf::Message> msg_ = nullptr;
+ std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr;
+ std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr;
std::string method_ = "Get";
};
} // namespace hbase
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 8b2fae5..2cb6200 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -58,24 +58,23 @@ int main(int argc, char *argv[]) {
auto conn = cf.make_connection(result.host_name(), result.port());
// Send the request
- Request r;
+ auto r = Request::get();
// This is a get request so make that
- auto msg = make_shared<hbase::pb::GetRequest>();
+ auto req_msg = static_pointer_cast<hbase::pb::GetRequest>(r->req_msg());
// Set what region
- msg->mutable_region()->set_value(FLAGS_region);
+ req_msg->mutable_region()->set_value(FLAGS_region);
// It's always this.
- msg->mutable_region()->set_type(
+ req_msg->mutable_region()->set_type(
RegionSpecifier_RegionSpecifierType::
RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
// What row.
- msg->mutable_get()->set_row(FLAGS_row);
+ req_msg->mutable_get()->set_row(FLAGS_row);
// Send it.
- r.set_msg(msg);
- auto resp = (*conn)(r).get(milliseconds(5000));
+ auto resp = (*conn)(std::move(r)).get(milliseconds(5000));
auto get_resp = std::static_pointer_cast<GetResponse>(resp.response());
cout << "GetResponse has_result = " << get_resp->has_result() << '\n';