You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:50 UTC
[hbase] 62/133: HBASE-17585 [C++] Use KVCodec in the RPC
request/response
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit a018834018f3356409342d6f92664def38b3a132
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Mon Feb 13 18:50:36 2017 -0800
HBASE-17585 [C++] Use KVCodec in the RPC request/response
---
.../connection/client-dispatcher.cc | 10 ++--
hbase-native-client/connection/client-dispatcher.h | 9 ++--
hbase-native-client/connection/client-handler.cc | 36 ++++++++++----
hbase-native-client/connection/client-handler.h | 7 +--
.../connection/connection-factory.cc | 5 +-
.../connection/connection-factory.h | 3 +-
.../connection/connection-pool-test.cc | 10 ++--
hbase-native-client/connection/connection-pool.cc | 11 +++--
hbase-native-client/connection/connection-pool.h | 3 +-
hbase-native-client/connection/pipeline.cc | 5 +-
hbase-native-client/connection/pipeline.h | 4 +-
hbase-native-client/connection/response.h | 11 ++++-
hbase-native-client/connection/rpc-client.cc | 32 +++++++------
hbase-native-client/connection/rpc-client.h | 21 ++++----
hbase-native-client/connection/rpc-connection.h | 2 +-
hbase-native-client/connection/service.h | 2 +-
hbase-native-client/core/client-test.cc | 6 +--
hbase-native-client/core/client.cc | 9 +++-
hbase-native-client/core/client.h | 2 +
hbase-native-client/core/location-cache-test.cc | 10 ++--
hbase-native-client/core/location-cache.cc | 20 +-------
hbase-native-client/core/meta-utils.cc | 34 +++++++++++++
hbase-native-client/core/meta-utils.h | 7 +++
hbase-native-client/core/response_converter.cc | 56 ++++++++++++++++++++--
hbase-native-client/core/response_converter.h | 9 +++-
hbase-native-client/core/result.cc | 2 +-
hbase-native-client/core/result.h | 2 +-
hbase-native-client/core/simple-client.cc | 7 ++-
hbase-native-client/core/table.cc | 4 +-
.../serde/client-deserializer-test.cc | 8 ++--
.../serde/client-serializer-test.cc | 8 ++--
hbase-native-client/serde/rpc.cc | 14 +++++-
hbase-native-client/serde/rpc.h | 13 ++++-
33 files changed, 270 insertions(+), 112 deletions(-)
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 1ace99c..626fc76 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -26,8 +26,8 @@ using namespace wangle;
ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {}
-void ClientDispatcher::read(Context *ctx, Response in) {
- auto call_id = in.call_id();
+void ClientDispatcher::read(Context *ctx, std::unique_ptr<Response> in) {
+ auto call_id = in->call_id();
auto search = requests_.find(call_id);
CHECK(search != requests_.end());
@@ -37,13 +37,13 @@ void ClientDispatcher::read(Context *ctx, Response in) {
// TODO(eclark): check if the response
// is an exception. If it is then set that.
- p.setValue(in);
+ p.setValue(std::move(in));
}
-Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
+Future<std::unique_ptr<Response>> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
auto call_id = current_call_id_++;
arg->set_call_id(call_id);
- requests_.insert(call_id, Promise<Response>{});
+ requests_.insert(call_id, Promise<std::unique_ptr<Response>>{});
auto &p = requests_.find(call_id)->second;
auto f = p.getFuture();
p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h
index 0489717..857042c 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -36,21 +36,22 @@ namespace hbase {
* future.
*/
class ClientDispatcher
- : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>, Response> {
+ : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>,
+ std::unique_ptr<Response>> {
public:
/** Create a new ClientDispatcher */
ClientDispatcher();
/** Read a response off the pipeline. */
- void read(Context *ctx, Response in) override;
+ void read(Context *ctx, std::unique_ptr<Response> in) override;
/** Take a request as a call and send it down the pipeline. */
- folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
+ folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> arg) override;
/** Close the dispatcher and the associated pipeline. */
folly::Future<folly::Unit> close(Context *ctx) override;
/** Close the dispatcher and the associated pipeline. */
folly::Future<folly::Unit> close() override;
private:
- folly::AtomicHashMap<uint32_t, folly::Promise<Response>> requests_;
+ folly::AtomicHashMap<uint32_t, folly::Promise<std::unique_ptr<Response>>> requests_;
// Start at some number way above what could
// be there for un-initialized call id counters.
//
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 5a6dce2..af84572 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -36,9 +36,9 @@ using hbase::pb::ResponseHeader;
using hbase::pb::GetResponse;
using google::protobuf::Message;
-ClientHandler::ClientHandler(std::string user_name)
+ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec)
: user_name_(user_name),
- serde_(),
+ serde_(codec),
once_flag_(std::make_unique<std::once_flag>()),
resp_msgs_(
make_unique<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>(
@@ -47,12 +47,12 @@ ClientHandler::ClientHandler(std::string user_name)
void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
if (LIKELY(buf != nullptr)) {
buf->coalesce();
- Response received;
+ auto received = std::make_unique<Response>();
ResponseHeader header;
int used_bytes = serde_.ParseDelimited(buf.get(), &header);
- LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
- << " has_exception=" << header.has_exception();
+ VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
+ << " has_exception=" << header.has_exception();
// Get the response protobuf from the map
auto search = resp_msgs_->find(header.call_id());
@@ -67,17 +67,34 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
// set the call_id.
// This will be used to by the dispatcher to match up
// the promise with the response.
- received.set_call_id(header.call_id());
+ received->set_call_id(header.call_id());
// If there was an exception then there's no
// data left on the wire.
if (header.has_exception() == false) {
buf->trimStart(used_bytes);
+
+ int cell_block_length = 0;
used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get());
+ if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) {
+ cell_block_length = header.cell_block_meta().length();
+ }
+
+ VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
+ << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length;
+
// Make sure that bytes were parsed.
- CHECK(used_bytes == buf->length());
- received.set_resp_msg(resp_msg);
+ CHECK((used_bytes + cell_block_length) == buf->length());
+
+ if (cell_block_length > 0) {
+ auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
+ received->set_cell_scanner(std::move(cell_scanner));
+ }
+
+ received->set_resp_msg(resp_msg);
}
+ // TODO: set exception in Response here
+
ctx->fireRead(std::move(received));
}
}
@@ -94,6 +111,9 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
// Now store the call id to response.
resp_msgs_->insert(r->call_id(), r->resp_msg());
+
+ VLOG(1) << "Writing RPC Request with call_id:" << r->call_id();
+
// Send the data down the pipeline.
return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
}
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index d860cc1..afb8e62 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -27,6 +27,7 @@
#include <string>
#include <utility>
+#include "serde/codec.h"
#include "serde/rpc.h"
// Forward decs.
@@ -51,14 +52,14 @@ namespace hbase {
* on first request.
*/
class ClientHandler
- : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, std::unique_ptr<Request>,
- std::unique_ptr<folly::IOBuf>> {
+ : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Response>,
+ std::unique_ptr<Request>, std::unique_ptr<folly::IOBuf>> {
public:
/**
* Create the handler
* @param user_name the user name of the user running this process.
*/
- explicit ClientHandler(std::string user_name);
+ explicit ClientHandler(std::string user_name, std::shared_ptr<Codec> codec);
/**
* Get bytes from the wire.
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index ff83212..6aba351 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -26,8 +26,9 @@
using namespace folly;
using namespace hbase;
-ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool)
- : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {}
+ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
+ std::shared_ptr<Codec> codec)
+ : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index da44c35..0d1e0d0 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -40,7 +40,8 @@ class ConnectionFactory {
* Constructor.
* There should only be one ConnectionFactory per client.
*/
- explicit ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool);
+ ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
+ std::shared_ptr<Codec> codec);
/** Default Desctructor */
virtual ~ConnectionFactory() = default;
diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc
index 0930095..623ce3c 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -36,7 +36,7 @@ using hbase::ConnectionId;
class MockConnectionFactory : public ConnectionFactory {
public:
- MockConnectionFactory() : ConnectionFactory(nullptr) {}
+ MockConnectionFactory() : ConnectionFactory(nullptr, nullptr) {}
MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
MOCK_METHOD3(Connect, std::shared_ptr<HBaseService>(
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
@@ -47,17 +47,17 @@ class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};
class MockServiceBase : public HBaseService {
public:
- folly::Future<Response> operator()(std::unique_ptr<Request> req) override {
+ folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> req) override {
return do_operation(req.get());
}
- virtual folly::Future<Response> do_operation(Request *req) {
- return folly::makeFuture<Response>(Response{});
+ virtual folly::Future<std::unique_ptr<Response>> do_operation(Request *req) {
+ return folly::makeFuture<std::unique_ptr<Response>>(std::make_unique<Response>());
}
};
class MockService : public MockServiceBase {
public:
- MOCK_METHOD1(do_operation, folly::Future<Response>(Request *));
+ MOCK_METHOD1(do_operation, folly::Future<std::unique_ptr<Response>>(Request *));
};
TEST(TestConnectionPool, TestOnlyCreateOnce) {
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 15dd64e..6635a6d 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -33,8 +33,9 @@ using hbase::HBaseService;
using folly::SharedMutexWritePriority;
using folly::SocketAddress;
-ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
- : cf_(std::make_shared<ConnectionFactory>(io_executor)),
+ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<Codec> codec)
+ : cf_(std::make_shared<ConnectionFactory>(io_executor, codec)),
clients_(),
connections_(),
map_mutex_() {}
@@ -88,12 +89,12 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
auto clientBootstrap = cf_->MakeBootstrap();
auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port());
- auto conneciton = std::make_shared<RpcConnection>(remote_id, dispatcher);
+ auto connection = std::make_shared<RpcConnection>(remote_id, dispatcher);
- connections_.insert(std::make_pair(remote_id, conneciton));
+ connections_.insert(std::make_pair(remote_id, connection));
clients_.insert(std::make_pair(remote_id, clientBootstrap));
- return conneciton;
+ return connection;
}
}
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 1f2a182..23e5e9a 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -46,7 +46,8 @@ namespace hbase {
class ConnectionPool {
public:
/** Create connection pool wit default connection factory */
- explicit ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
+ ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<Codec> codec);
/**
* Desctructor.
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 14ad73c..00dc05c 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -30,7 +30,8 @@ using namespace folly;
using namespace hbase;
using namespace wangle;
-RpcPipelineFactory::RpcPipelineFactory() : user_util_() {}
+RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec)
+ : user_util_(), codec_(codec) {}
SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
std::shared_ptr<AsyncTransportWrapper> sock) {
@@ -38,7 +39,7 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
pipeline->addBack(AsyncSocketHandler{sock});
pipeline->addBack(EventBaseHandler{});
pipeline->addBack(LengthFieldBasedFrameDecoder{});
- pipeline->addBack(ClientHandler{user_util_.user_name()});
+ pipeline->addBack(ClientHandler{user_util_.user_name(), codec_});
pipeline->finalize();
return pipeline;
}
diff --git a/hbase-native-client/connection/pipeline.h b/hbase-native-client/connection/pipeline.h
index 343219d..ea40cfd 100644
--- a/hbase-native-client/connection/pipeline.h
+++ b/hbase-native-client/connection/pipeline.h
@@ -25,6 +25,7 @@
#include "connection/request.h"
#include "connection/response.h"
+#include "serde/codec.h"
#include "utils/user-util.h"
namespace hbase {
@@ -40,7 +41,7 @@ class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
/**
* Constructor. This will create user util.
*/
- RpcPipelineFactory();
+ explicit RpcPipelineFactory(std::shared_ptr<Codec> codec);
/**
* Create a new pipeline.
@@ -55,5 +56,6 @@ class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
private:
UserUtil user_util_;
+ std::shared_ptr<Codec> codec_;
};
} // namespace hbase
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
index 560387c..1d60fed 100644
--- a/hbase-native-client/connection/response.h
+++ b/hbase-native-client/connection/response.h
@@ -22,6 +22,8 @@
#include <memory>
#include <utility>
+#include "serde/cell-scanner.h"
+
// Forward
namespace google {
namespace protobuf {
@@ -42,7 +44,7 @@ class Response {
* Constructor.
* Initinalizes the call id to 0. 0 should never be a valid call id.
*/
- Response() : call_id_(0), resp_msg_(nullptr) {}
+ Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {}
/** Get the call_id */
uint32_t call_id() { return call_id_; }
@@ -62,8 +64,15 @@ class Response {
resp_msg_ = std::move(response);
}
+ void set_cell_scanner(std::unique_ptr<CellScanner> cell_scanner) {
+ cell_scanner_ = std::move(cell_scanner);
+ }
+
+ const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; }
+
private:
uint32_t call_id_;
std::shared_ptr<google::protobuf::Message> resp_msg_;
+ std::unique_ptr<CellScanner> cell_scanner_;
};
} // namespace hbase
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 7621193..cfbda3a 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -39,38 +39,40 @@ class RpcChannelImplementation : public AbstractRpcChannel {
};
} // namespace hbase
-RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<Codec> codec)
: io_executor_(io_executor) {
- cp_ = std::make_shared<ConnectionPool>(io_executor_);
+ cp_ = std::make_shared<ConnectionPool>(io_executor_, codec);
}
void RpcClient::Close() { io_executor_->stop(); }
-std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
+std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
std::unique_ptr<Request> req,
std::shared_ptr<User> ticket) {
- return std::make_shared<Response>(AsyncCall(host, port, std::move(req), ticket).get());
+ return AsyncCall(host, port, std::move(req), ticket).get();
}
-std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
+std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
std::unique_ptr<Request> req,
std::shared_ptr<User> ticket,
const std::string& service_name) {
- return std::make_shared<Response>(
- AsyncCall(host, port, std::move(req), ticket, service_name).get());
+ return AsyncCall(host, port, std::move(req), ticket, service_name).get();
}
-folly::Future<Response> RpcClient::AsyncCall(const std::string& host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket) {
+folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
+ uint16_t port,
+ std::unique_ptr<Request> req,
+ std::shared_ptr<User> ticket) {
auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
return GetConnection(remote_id)->SendRequest(std::move(req));
}
-folly::Future<Response> RpcClient::AsyncCall(const std::string& host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket,
- const std::string& service_name) {
+folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
+ uint16_t port,
+ std::unique_ptr<Request> req,
+ std::shared_ptr<User> ticket,
+ const std::string& service_name) {
auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name);
return GetConnection(remote_id)->SendRequest(std::move(req));
}
@@ -99,5 +101,5 @@ void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* contro
std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name());
AsyncCall(host, port, std::move(req), ticket, method->service()->name())
- .then([done, this](Response resp) { done->Run(); });
+ .then([done, this](std::unique_ptr<Response> resp) { done->Run(); });
}
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index aeb9b56..f4645a0 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -51,27 +51,28 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
friend class RpcChannelImplementation;
public:
- RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
+ RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+ std::shared_ptr<Codec> codec);
virtual ~RpcClient() { Close(); }
- virtual std::shared_ptr<Response> SyncCall(const std::string &host, uint16_t port,
+ virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port,
std::unique_ptr<Request> req,
std::shared_ptr<User> ticket);
- virtual std::shared_ptr<Response> SyncCall(const std::string &host, uint16_t port,
+ virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port,
std::unique_ptr<Request> req,
std::shared_ptr<User> ticket,
const std::string &service_name);
- virtual folly::Future<Response> AsyncCall(const std::string &host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket);
+ virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port,
+ std::unique_ptr<Request> req,
+ std::shared_ptr<User> ticket);
- virtual folly::Future<Response> AsyncCall(const std::string &host, uint16_t port,
- std::unique_ptr<Request> req,
- std::shared_ptr<User> ticket,
- const std::string &service_name);
+ virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port,
+ std::unique_ptr<Request> req,
+ std::shared_ptr<User> ticket,
+ const std::string &service_name);
virtual void Close();
diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h
index e2500b2..c37b1e0 100644
--- a/hbase-native-client/connection/rpc-connection.h
+++ b/hbase-native-client/connection/rpc-connection.h
@@ -41,7 +41,7 @@ class RpcConnection {
virtual std::shared_ptr<HBaseService> get_service() const { return hbase_service_; }
- virtual folly::Future<Response> SendRequest(std::unique_ptr<Request> req) {
+ virtual folly::Future<std::unique_ptr<Response>> SendRequest(std::unique_ptr<Request> req) {
return (*hbase_service_)(std::move(req));
}
diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h
index 0d2e258..64d4f07 100644
--- a/hbase-native-client/connection/service.h
+++ b/hbase-native-client/connection/service.h
@@ -26,5 +26,5 @@
#include "connection/response.h"
namespace hbase {
-using HBaseService = wangle::Service<std::unique_ptr<Request>, Response>;
+using HBaseService = wangle::Service<std::unique_ptr<Request>, std::unique_ptr<Response>>;
} // namespace hbase
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 28eec6f..0a45fff 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -116,9 +116,9 @@ TEST(Client, Get) {
// Using TestUtil to populate test data
hbase::TestUtil *test_util = new hbase::TestUtil();
- test_util->RunShellCmd("create 't', 'd'");
- test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'");
- test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'");
+ test_util->RunShellCmd(
+ "create 't', 'd'; put 't', 'test2', 'd:2', 'value2'; put 't', 'test2', 'd:extra', 'value for "
+ "extra'");
// Create TableName and Row to be fetched from HBase
auto tn = folly::to<hbase::pb::TableName>("t");
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index c1efd8b..685524f 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -46,7 +46,14 @@ void Client::init(const hbase::Configuration &conf) {
std::make_shared<wangle::CPUThreadPoolExecutor>(4); // TODO: read num threads from conf
io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
- rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_);
+ std::shared_ptr<Codec> codec = nullptr;
+ if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
+ std::string(KeyValueCodec::kJavaClassName)) {
+ codec = std::make_shared<hbase::KeyValueCodec>();
+ } else {
+ LOG(WARNING) << "Not using RPC Cell Codec";
+ }
+ rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec);
location_cache_ =
std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
}
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 730981d..0e436ba 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -30,6 +30,7 @@
#include "connection/rpc-client.h"
#include "core/configuration.h"
#include "core/hbase_configuration_loader.h"
+#include "core/keyvalue-codec.h"
#include "core/location-cache.h"
#include "core/table.h"
#include "if/Cell.pb.h"
@@ -70,6 +71,7 @@ class Client {
void init(const hbase::Configuration &conf);
const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
+ const std::string kRpcCodec = "hbase.client.rpc.codec";
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
std::shared_ptr<hbase::LocationCache> location_cache_;
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 1ad6c65..9a778c6 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -23,6 +23,7 @@
#include <chrono>
+#include "core/keyvalue-codec.h"
#include "if/HBase.pb.h"
#include "serde/table-name.h"
#include "test-util/test-util.h"
@@ -33,7 +34,8 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) {
TestUtil test_util{};
auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
- auto cp = std::make_shared<ConnectionPool>(io);
+ auto codec = std::make_shared<KeyValueCodec>();
+ auto cp = std::make_shared<ConnectionPool>(io, codec);
LocationCache cache{test_util.conf(), cpu, cp};
auto f = cache.LocateMeta();
auto result = f.get();
@@ -49,7 +51,8 @@ TEST(LocationCacheTest, TestGetRegionLocation) {
TestUtil test_util{};
auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
- auto cp = std::make_shared<ConnectionPool>(io);
+ auto codec = std::make_shared<KeyValueCodec>();
+ auto cp = std::make_shared<ConnectionPool>(io, codec);
LocationCache cache{test_util.conf(), cpu, cp};
// If there is no table this should throw an exception
@@ -68,7 +71,8 @@ TEST(LocationCacheTest, TestCaching) {
TestUtil test_util{};
auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
- auto cp = std::make_shared<ConnectionPool>(io);
+ auto codec = std::make_shared<KeyValueCodec>();
+ auto cp = std::make_shared<ConnectionPool>(io, codec);
LocationCache cache{test_util.conf(), cpu, cp};
auto tn_1 = folly::to<hbase::pb::TableName>("t1");
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index dab5deb..da9f64a 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -128,10 +128,10 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const Tabl
.then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row)));
})
- .then([this](Response resp) {
+ .then([this](std::unique_ptr<Response> resp) {
// take the protobuf response and make it into
// a region location.
- return this->CreateLocation(std::move(resp));
+ return meta_util_.CreateLocation(std::move(*resp));
})
.then([tn, this](std::shared_ptr<RegionLocation> rl) {
// Make sure that the correct location was found.
@@ -166,22 +166,6 @@ Future<shared_ptr<RegionLocation>> LocationCache::LocateRegion(const hbase::pb::
}
}
-std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &resp) {
- auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg());
- auto &results = resp_msg->results().Get(0);
- auto &cells = results.cell();
-
- // TODO(eclark): There should probably be some better error
- // handling around this.
- auto cell_zero = cells.Get(0).value();
- auto cell_one = cells.Get(1).value();
- auto row = cells.Get(0).row();
-
- auto region_info = folly::to<RegionInfo>(cell_zero);
- auto server_name = folly::to<ServerName>(cell_one);
- return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr);
-}
-
// must hold shared lock on locations_lock_
shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName &tn,
const std::string &row) {
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index f92300c..bd26607 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -24,18 +24,26 @@
#include "connection/request.h"
#include "connection/response.h"
+#include "core/response_converter.h"
#include "if/Client.pb.h"
+#include "serde/region-info.h"
+#include "serde/server-name.h"
#include "serde/table-name.h"
using hbase::pb::TableName;
using hbase::MetaUtil;
using hbase::Request;
using hbase::Response;
+using hbase::RegionLocation;
+using hbase::pb::RegionInfo;
using hbase::pb::ScanRequest;
using hbase::pb::ServerName;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
static const std::string META_REGION = "1588230740";
+static const std::string CATALOG_FAMILY = "info";
+static const std::string REGION_INFO_COLUMN = "regioninfo";
+static const std::string SERVER_COLUMN = "server";
std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const {
return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
@@ -79,3 +87,29 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st
scan->set_start_row(RegionLookupRowkey(tn, row));
return request;
}
+
+std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) {
+ std::vector<std::unique_ptr<Result>> results = ResponseConverter::FromScanResponse(resp);
+ if (results.size() != 1) {
+ throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" +
+ std::to_string(results.size()));
+ }
+
+ auto result = *results[0];
+ // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO
+
+ std::shared_ptr<std::string> region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
+ std::shared_ptr<std::string> server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
+
+ if (region_info_str == nullptr) {
+ throw std::runtime_error("regioninfo column null for location");
+ }
+ if (server_str == nullptr) {
+ throw std::runtime_error("server column null for location");
+ }
+
+ auto row = result.Row();
+ auto region_info = folly::to<RegionInfo>(*region_info_str);
+ auto server_name = folly::to<ServerName>(*server_str);
+ return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr);
+}
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index 075215e..d67f32d 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -22,6 +22,8 @@
#include <string>
#include "connection/request.h"
+#include "connection/response.h"
+#include "core/region-location.h"
#include "if/HBase.pb.h"
#include "serde/table-name.h"
@@ -43,5 +45,10 @@ class MetaUtil {
* location.
*/
std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn, const std::string &row) const;
+
+ /**
+ * Return a RegionLocation from the parsed Response
+ */
+ std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
};
} // namespace hbase
diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc
index 3fe2ba9..19a3554 100644
--- a/hbase-native-client/core/response_converter.cc
+++ b/hbase-native-client/core/response_converter.cc
@@ -22,9 +22,9 @@
#include <vector>
#include "core/cell.h"
-#include "if/Client.pb.h"
using hbase::pb::GetResponse;
+using hbase::pb::ScanResponse;
namespace hbase {
@@ -32,19 +32,65 @@ ResponseConverter::ResponseConverter() {}
ResponseConverter::~ResponseConverter() {}
-std::unique_ptr<hbase::Result> ResponseConverter::FromGetResponse(const Response& resp) {
+std::unique_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) {
auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg());
+ return ToResult(get_resp->result(), resp.cell_scanner());
+}
+
+std::unique_ptr<Result> ResponseConverter::ToResult(
+ const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
std::vector<std::shared_ptr<Cell>> vcells;
- for (auto cell : get_resp->result().cell()) {
+ for (auto cell : result.cell()) {
std::shared_ptr<Cell> pcell =
std::make_shared<Cell>(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(),
cell.value(), static_cast<hbase::CellType>(cell.cell_type()));
vcells.push_back(pcell);
}
- return std::make_unique<hbase::Result>(vcells, get_resp->result().exists(),
- get_resp->result().stale(), get_resp->result().partial());
+ // iterate over the cells coming from rpc codec
+ if (cell_scanner != nullptr) {
+ while (cell_scanner->Advance()) {
+ vcells.push_back(cell_scanner->Current());
+ }
+ // TODO: check associated cell count?
+ }
+ return std::make_unique<Result>(vcells, result.exists(), result.stale(), result.partial());
}
+std::vector<std::unique_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
+ auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
+ VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
+ int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
+ : scan_resp->results_size();
+
+ std::vector<std::unique_ptr<Result>> results{static_cast<size_t>(num_results)};
+ for (int i = 0; i < num_results; i++) {
+ if (resp.cell_scanner() != nullptr) {
+ // Cells are out in cellblocks. Group them up again as Results. How many to read at a
+ // time will be found in getCellsLength -- length here is how many Cells in the i'th Result
+ int num_cells = scan_resp->cells_per_result(i);
+
+ std::vector<std::shared_ptr<Cell>> vcells;
+ while (resp.cell_scanner()->Advance()) {
+ vcells.push_back(resp.cell_scanner()->Current());
+ }
+ // TODO: check associated cell count?
+
+ if (vcells.size() != num_cells) {
+ std::string msg = "Results sent from server=" + std::to_string(num_results) +
+ ". But only got " + std::to_string(i) +
+ " results completely at client. Resetting the scanner to scan again.";
+ LOG(ERROR) << msg;
+ throw std::runtime_error(msg);
+ }
+ // TODO: handle partial results per Result by checking partial_flag_per_result
+ results[i] = std::make_unique<Result>(vcells, false, scan_resp->stale(), false);
+ } else {
+ results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
+ }
+ }
+
+ return results;
+}
} /* namespace hbase */
diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h
index 86fb632..859644b 100644
--- a/hbase-native-client/core/response_converter.h
+++ b/hbase-native-client/core/response_converter.h
@@ -22,6 +22,8 @@
#include <memory>
#include "connection/response.h"
#include "core/result.h"
+#include "if/Client.pb.h"
+#include "serde/cell-scanner.h"
namespace hbase {
@@ -33,11 +35,16 @@ class ResponseConverter {
public:
~ResponseConverter();
+ static std::unique_ptr<Result> ToResult(const hbase::pb::Result& result,
+ const std::unique_ptr<CellScanner>& cell_scanner);
+
/**
* @brief Returns a Result object created by PB Message in passed Response object.
* @param resp - Response object having the PB message.
*/
- static std::unique_ptr<hbase::Result> FromGetResponse(const Response &resp);
+ static std::unique_ptr<hbase::Result> FromGetResponse(const Response& resp);
+
+ static std::vector<std::unique_ptr<Result>> FromScanResponse(const Response& resp);
private:
// Constructor not required. We have all static methods to extract response from PB messages.
diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc
index d73a5b2..4db3fca 100644
--- a/hbase-native-client/core/result.cc
+++ b/hbase-native-client/core/result.cc
@@ -87,7 +87,7 @@ bool Result::IsEmpty() const { return cells_.empty(); }
const std::string &Result::Row() const { return row_; }
-const int Result::Size() const { return cells_.size(); }
+int Result::Size() const { return cells_.size(); }
const ResultMap &Result::Map() const { return result_map_; }
diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h
index cd41cf0..8ff4311 100644
--- a/hbase-native-client/core/result.h
+++ b/hbase-native-client/core/result.h
@@ -100,7 +100,7 @@ class Result {
/**
* @brief Returns the size of the underlying Cell vector
*/
- const int Size() const;
+ int Size() const;
/**
* @brief Map of families to all versions of its qualifiers and values.
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 3cd0a93..4b1144c 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -31,6 +31,7 @@
#include "connection/connection-pool.h"
#include "core/client.h"
+#include "core/keyvalue-codec.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
#include "serde/server-name.h"
@@ -43,6 +44,7 @@ using hbase::Configuration;
using hbase::Response;
using hbase::Request;
using hbase::HBaseService;
+using hbase::KeyValueCodec;
using hbase::LocationCache;
using hbase::ConnectionPool;
using hbase::ConnectionFactory;
@@ -88,7 +90,8 @@ int main(int argc, char *argv[]) {
// Set up thread pools.
auto cpu_pool = std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads);
auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5);
- auto cp = std::make_shared<ConnectionPool>(io_pool);
+ auto codec = std::make_shared<KeyValueCodec>();
+ auto cp = std::make_shared<ConnectionPool>(io_pool, codec);
// Configuration
auto conf = std::make_shared<Configuration>();
@@ -105,7 +108,7 @@ int main(int argc, char *argv[]) {
auto num_puts = FLAGS_columns;
- auto results = std::vector<Future<Response>>{};
+ auto results = std::vector<Future<std::unique_ptr<Response>>>{};
auto col = uint64_t{0};
for (; col < num_puts; col++) {
results.push_back(
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 58125f9..4e30d4b 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -56,12 +56,12 @@ std::unique_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name());
auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil
- Future<Response> f =
+ Future<std::unique_ptr<Response>> f =
rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
std::move(req), user, "ClientService");
auto resp = f.get();
- return hbase::ResponseConverter::FromGetResponse(resp);
+ return hbase::ResponseConverter::FromGetResponse(*resp);
}
void Table::Close() {
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index a30f904..054684d 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -30,12 +30,12 @@ using hbase::pb::RegionSpecifier;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
TEST(TestRpcSerde, TestReturnFalseOnNullPtr) {
- RpcSerde deser;
+ RpcSerde deser{nullptr};
ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0);
}
TEST(TestRpcSerde, TestReturnFalseOnBadInput) {
- RpcSerde deser;
+ RpcSerde deser{nullptr};
auto buf = IOBuf::copyBuffer("test");
GetRequest gr;
@@ -44,8 +44,8 @@ TEST(TestRpcSerde, TestReturnFalseOnBadInput) {
TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) {
GetRequest in;
- RpcSerde ser;
- RpcSerde deser;
+ RpcSerde ser{nullptr};
+ RpcSerde deser{nullptr};
// fill up the GetRequest.
in.mutable_region()->set_value("test_region_id");
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 2bd17fb..33c48f3 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -32,7 +32,7 @@ using namespace folly;
using namespace folly::io;
TEST(RpcSerdeTest, PreambleIncludesHBas) {
- RpcSerde ser;
+ RpcSerde ser{nullptr};
auto buf = ser.Preamble();
const char *p = reinterpret_cast<const char *>(buf->data());
// Take the first for chars and make sure they are the
@@ -43,14 +43,14 @@ TEST(RpcSerdeTest, PreambleIncludesHBas) {
}
TEST(RpcSerdeTest, PreambleIncludesVersion) {
- RpcSerde ser;
+ RpcSerde ser{nullptr};
auto buf = ser.Preamble();
EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]);
EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]);
}
TEST(RpcSerdeTest, TestHeaderLengthPrefixed) {
- RpcSerde ser;
+ RpcSerde ser{nullptr};
auto header = ser.Header("elliott");
// The header should be prefixed by 4 bytes of length.
@@ -65,7 +65,7 @@ TEST(RpcSerdeTest, TestHeaderLengthPrefixed) {
}
TEST(RpcSerdeTest, TestHeaderDecode) {
- RpcSerde ser;
+ RpcSerde ser{nullptr};
auto buf = ser.Header("elliott");
auto header_buf = buf->next();
ConnectionHeader h;
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
index 3bdb489..d5bca62 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc.cc
@@ -83,7 +83,7 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
return coded_stream.CurrentPosition();
}
-RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {}
+RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : auth_type_(DEFAULT_AUTH_TYPE), codec_(codec) {}
unique_ptr<IOBuf> RpcSerde::Preamble() {
auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
@@ -110,6 +110,10 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) {
// didn't.
// TODO: send the service name and user from the RpcClient
h.set_service_name(INTERFACE);
+
+ if (codec_ != nullptr) {
+ h.set_cell_block_codec_class(codec_->java_class_name());
+ }
return PrependLength(SerializeMessage(h));
}
@@ -128,6 +132,14 @@ unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const string &method
return PrependLength(std::move(ser_header));
}
+std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
+ uint32_t offset, uint32_t length) {
+ if (codec_ == nullptr) {
+ return nullptr;
+ }
+ return codec_->CreateDecoder(std::move(buf), offset, length);
+}
+
unique_ptr<IOBuf> RpcSerde::PrependLength(unique_ptr<IOBuf> msg) {
// Java ints are 4 long. So create a buffer that large
auto len_buf = IOBuf::create(4);
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h
index 7d060c7..c59f903 100644
--- a/hbase-native-client/serde/rpc.h
+++ b/hbase-native-client/serde/rpc.h
@@ -21,6 +21,9 @@
#include <memory>
#include <string>
+#include "serde/cell-scanner.h"
+#include "serde/codec.h"
+
// Forward
namespace folly {
class IOBuf;
@@ -44,7 +47,7 @@ class RpcSerde {
/**
* Constructor assumes the default auth type.
*/
- RpcSerde();
+ RpcSerde(std::shared_ptr<Codec> codec);
/**
* Destructor. This is provided just for testing purposes.
@@ -76,6 +79,13 @@ class RpcSerde {
std::unique_ptr<folly::IOBuf> Header(const std::string &user);
/**
+ * Take ownership of the passed buffer, and create a CellScanner using the
+ * Codec class to parse Cells out of the wire.
+ */
+ std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
+ uint32_t length);
+
+ /**
* Serialize a request message into a protobuf.
* Request consists of:
*
@@ -109,5 +119,6 @@ class RpcSerde {
private:
/* data */
uint8_t auth_type_;
+ std::shared_ptr<Codec> codec_;
};
} // namespace hbase