You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/07/11 23:48:31 UTC
[38/50] [abbrv] hbase git commit: HBASE-15750 Add on meta
deserialization
HBASE-15750 Add on meta deserialization
Summary: Add on meta region info deserialization
Test Plan:
Unit tests.
Simple client connects.
Differential Revision: https://reviews.facebook.net/D57555
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/190fc7aa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/190fc7aa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/190fc7aa
Branch: refs/heads/HBASE-14850
Commit: 190fc7aae288c97cb23e8cc130dbe7774917cedb
Parents: 50d1672
Author: Elliott Clark <ec...@apache.org>
Authored: Tue May 3 12:17:07 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Mon Jul 11 16:47:26 2016 -0700
----------------------------------------------------------------------
.../connection/client-dispatcher.cc | 1 +
.../connection/client-handler.cc | 13 +-
hbase-native-client/connection/client-handler.h | 6 +-
.../connection/connection-pool.cc | 36 +---
.../connection/connection-pool.h | 8 +-
hbase-native-client/core/BUCK | 4 -
hbase-native-client/core/location-cache-test.cc | 1 +
hbase-native-client/core/location-cache.cc | 73 +++++++-
hbase-native-client/core/location-cache.h | 7 +-
hbase-native-client/core/meta-utils.cc | 9 +-
hbase-native-client/core/meta-utils.h | 7 +-
hbase-native-client/core/region-location.h | 5 +-
hbase-native-client/core/simple-client.cc | 6 +-
hbase-native-client/core/table-name-test.cc | 54 ------
hbase-native-client/core/table-name.h | 50 -----
hbase-native-client/serde/BUCK | 56 +++---
.../serde/client-deserializer-test.cc | 25 ++-
.../serde/client-deserializer.cc | 68 -------
hbase-native-client/serde/client-deserializer.h | 36 ----
.../serde/client-serializer-test.cc | 26 +--
hbase-native-client/serde/client-serializer.cc | 139 --------------
hbase-native-client/serde/client-serializer.h | 55 ------
.../serde/region-info-deserializer-test.cc | 54 ++++++
hbase-native-client/serde/region-info.h | 41 +++++
hbase-native-client/serde/rpc.cc | 181 +++++++++++++++++++
hbase-native-client/serde/rpc.h | 58 ++++++
hbase-native-client/serde/server-name-test.cc | 32 ++++
hbase-native-client/serde/server-name.h | 21 +++
hbase-native-client/serde/table-name-test.cc | 54 ++++++
hbase-native-client/serde/table-name.h | 54 ++++++
.../serde/zk-deserializer-test.cc | 8 +-
hbase-native-client/serde/zk-deserializer.cc | 78 --------
hbase-native-client/serde/zk-deserializer.h | 35 ----
hbase-native-client/serde/zk.cc | 78 ++++++++
hbase-native-client/serde/zk.h | 35 ++++
35 files changed, 761 insertions(+), 653 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index 817adc1..6e2dc54 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -44,6 +44,7 @@ Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
auto &p = requests_[call_id];
auto f = p.getFuture();
p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
+ LOG(ERROR) << "e = " << call_id;
this->requests_.erase(call_id);
});
this->pipeline_->write(std::move(arg));
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 3180f4e..496e4f2 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -37,8 +37,7 @@ using hbase::pb::GetResponse;
using google::protobuf::Message;
ClientHandler::ClientHandler(std::string user_name)
- : user_name_(user_name), need_send_header_(true), ser_(), deser_(),
- resp_msgs_() {}
+ : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {}
void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
if (LIKELY(buf != nullptr)) {
@@ -46,7 +45,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
Response received;
ResponseHeader header;
- int used_bytes = deser_.parse_delimited(buf.get(), &header);
+ int used_bytes = serde_.ParseDelimited(buf.get(), &header);
LOG(INFO) << "Read ResponseHeader size=" << used_bytes
<< " call_id=" << header.call_id()
<< " has_exception=" << header.has_exception();
@@ -70,7 +69,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
// data left on the wire.
if (header.has_exception() == false) {
buf->trimStart(used_bytes);
- used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get());
+ used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get());
// Make sure that bytes were parsed.
CHECK(used_bytes == buf->length());
received.set_response(resp_msg);
@@ -91,13 +90,13 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
// and one for the request.
//
// That doesn't seem like too bad, but who knows.
- auto pre = ser_.preamble();
- auto header = ser_.header(user_name_);
+ auto pre = serde_.Preamble();
+ auto header = serde_.Header(user_name_);
pre->appendChain(std::move(header));
ctx->fireWrite(std::move(pre));
}
resp_msgs_[r->call_id()] = r->resp_msg();
return ctx->fireWrite(
- ser_.request(r->call_id(), r->method(), r->req_msg().get()));
+ serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index 68513de..ce99c9e 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -22,8 +22,7 @@
#include <string>
-#include "serde/client-deserializer.h"
-#include "serde/client-serializer.h"
+#include "serde/rpc.h"
// Forward decs.
namespace hbase {
@@ -49,8 +48,7 @@ public:
private:
bool need_send_header_;
std::string user_name_;
- ClientSerializer ser_;
- ClientDeserializer deser_;
+ RpcSerde serde_;
// in flight requests
std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>>
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 72c1306..eafe60a 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -25,41 +25,10 @@ using std::mutex;
using std::unique_ptr;
using std::shared_ptr;
using hbase::pb::ServerName;
-using wangle::ServiceFilter;
using folly::SharedMutexWritePriority;
namespace hbase {
-class RemoveServiceFilter
- : public ServiceFilter<unique_ptr<Request>, Response> {
-
-public:
- RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn,
- ConnectionPool &cp)
- : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn),
- cp_(cp) {}
-
- folly::Future<folly::Unit> close() override {
- if (!released.exchange(true)) {
- return this->service_->close().then(
- [this]() { this->cp_.close(this->sn_); });
- } else {
- return folly::makeFuture();
- }
- }
-
- virtual bool isAvailable() override { return service_->isAvailable(); }
-
- folly::Future<Response> operator()(unique_ptr<Request> req) override {
- return (*this->service_)(std::move(req));
- }
-
-private:
- std::atomic<bool> released{false};
- hbase::pb::ServerName sn_;
- ConnectionPool &cp_;
-};
-
ConnectionPool::ConnectionPool()
: cf_(std::make_shared<ConnectionFactory>()), connections_(), map_mutex_() {
}
@@ -72,13 +41,12 @@ std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) {
if (found == connections_.end() || found->second == nullptr) {
SharedMutexWritePriority::WriteHolder holder(std::move(holder));
auto new_con = cf_->make_connection(sn.host_name(), sn.port());
- auto wrapped = std::make_shared<RemoveServiceFilter>(new_con, sn, *this);
- connections_[sn] = wrapped;
+ connections_[sn] = new_con;
return new_con;
}
return found->second;
}
-void ConnectionPool::close(ServerName sn) {
+void ConnectionPool::close(const ServerName &sn) {
SharedMutexWritePriority::WriteHolder holder(map_mutex_);
auto found = connections_.find(sn);
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 394cd71..b8330e3 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -27,13 +27,13 @@
#include "if/HBase.pb.h"
namespace hbase {
-struct MyServerNameEquals {
+struct ServerNameEquals {
bool operator()(const hbase::pb::ServerName &lhs,
const hbase::pb::ServerName &rhs) const {
return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port();
}
};
-struct MyServerNameHash {
+struct ServerNameHash {
std::size_t operator()(hbase::pb::ServerName const &s) const {
std::size_t h1 = std::hash<std::string>()(s.host_name());
std::size_t h2 = std::hash<uint32_t>()(s.port());
@@ -46,12 +46,12 @@ public:
ConnectionPool();
explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn);
- void close(hbase::pb::ServerName sn);
+ void close(const hbase::pb::ServerName &sn);
private:
std::shared_ptr<ConnectionFactory> cf_;
std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
- MyServerNameHash, MyServerNameEquals>
+ ServerNameHash, ServerNameEquals>
connections_;
folly::SharedMutexWritePriority map_mutex_;
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 447248b..ef8c2f8 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -24,7 +24,6 @@ cxx_library(
"hbase_macros.h",
"region-location.h",
"location-cache.h",
- "table-name.h",
# TODO: move this out of exported
# Once meta lookup works
"meta-utils.h",
@@ -53,9 +52,6 @@ cxx_test(name="location-cache-test",
],
deps=[":core", ],
run_test_separately=True, )
-cxx_test(name="table-name-test",
- srcs=["table-name-test.cc", ],
- deps=[":core", ], )
cxx_binary(name="simple-client",
srcs=["simple-client.cc", ],
deps=[":core", "//connection:connection"], )
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/location-cache-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index f3166fb..172799d 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -30,4 +30,5 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) {
auto result = f.get();
ASSERT_FALSE(f.hasException());
ASSERT_TRUE(result.has_port());
+ ASSERT_TRUE(result.has_host_name());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 539051a..2667f11 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -20,19 +20,25 @@
#include <folly/Logging.h>
#include <folly/io/IOBuf.h>
+#include <wangle/concurrent/GlobalExecutor.h>
#include "connection/response.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
-#include "serde/zk-deserializer.h"
+#include "serde/server-name.h"
+#include "serde/region-info.h"
+#include "serde/zk.h"
using namespace std;
using namespace folly;
+using wangle::ServiceFilter;
+using hbase::Request;
using hbase::Response;
using hbase::LocationCache;
using hbase::RegionLocation;
using hbase::HBaseService;
+using hbase::ConnectionPool;
using hbase::pb::ScanResponse;
using hbase::pb::TableName;
using hbase::pb::ServerName;
@@ -45,7 +51,7 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server";
LocationCache::LocationCache(string quorum_spec,
shared_ptr<folly::Executor> executor)
: quorum_spec_(quorum_spec), executor_(executor), meta_promise_(nullptr),
- meta_lock_(), cp_(), meta_util_() {
+ meta_lock_(), cp_(), meta_util_(), zk_(nullptr) {
zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0);
}
@@ -95,28 +101,77 @@ ServerName LocationCache::ReadMetaLocation() {
buf->append(len);
MetaRegionServer mrs;
- if (derser.parse(buf.get(), &mrs) == false) {
+ if (derser.Parse(buf.get(), &mrs) == false) {
LOG(ERROR) << "Unable to decode";
}
return mrs.server();
}
-Future<RegionLocation> LocationCache::locateFromMeta(const TableName &tn,
- const string &row) {
+Future<std::shared_ptr<RegionLocation>>
+LocationCache::LocateFromMeta(const TableName &tn, const string &row) {
+ auto exc = wangle::getIOExecutor();
return this->LocateMeta()
.then([&](ServerName sn) { return this->cp_.get(sn); })
+ .via(exc.get()) // Need to handle all rpc's on the IOExecutor.
.then([&](std::shared_ptr<HBaseService> service) {
- return (*service)(std::move(meta_util_.make_meta_request(tn, row)));
+ return (*service)(std::move(meta_util_.MetaRequest(tn, row)));
})
.then([&](Response resp) {
// take the protobuf response and make it into
// a region location.
- return this->parse_response(std::move(resp));
+ return this->CreateLocation(std::move(resp));
});
}
-RegionLocation LocationCache::parse_response(const Response &resp) {
+class RemoveServiceFilter
+ : public ServiceFilter<std::unique_ptr<Request>, Response> {
+
+public:
+ RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn,
+ ConnectionPool &cp)
+ : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn),
+ cp_(cp) {}
+
+ folly::Future<folly::Unit> close() override {
+ if (!released.exchange(true)) {
+ return this->service_->close().then([this]() {
+ // TODO(eclark): remove the service from the meta cache.
+ this->cp_.close(this->sn_);
+ });
+ } else {
+ return folly::makeFuture();
+ }
+ }
+
+ virtual bool isAvailable() override {
+ return !released && service_->isAvailable();
+ }
+
+ folly::Future<Response> operator()(unique_ptr<Request> req) override {
+ // TODO(eclark): add in an on error handler that will
+ // remove the region location from the cache if needed.
+ // Also close the connection if this is likely to be an error
+ // that needs to get a new connection.
+ return (*this->service_)(std::move(req));
+ }
+
+private:
+ std::atomic<bool> released{false};
+ hbase::pb::ServerName sn_;
+ ConnectionPool &cp_;
+};
+
+std::shared_ptr<RegionLocation>
+LocationCache::CreateLocation(const Response &resp){
auto resp_msg = static_pointer_cast<ScanResponse>(resp.response());
+ auto &results = resp_msg->results().Get(0);
+ auto &cells = results.cell();
LOG(ERROR) << "resp_msg = " << resp_msg->DebugString();
- return RegionLocation{RegionInfo{}, ServerName{}, nullptr};
+ auto ri = folly::to<RegionInfo>(cells.Get(0).value());
+ auto sn = folly::to<ServerName>(cells.Get(1).value());
+
+ LOG(ERROR) << "RegionInfo = " << ri.DebugString();
+ LOG(ERROR) << "ServerName = " << sn.DebugString();
+ auto wrapped = make_shared<RemoveServiceFilter>(cp_.get(sn), sn, this->cp_);
+ return std::make_shared<RegionLocation>(std::move(ri), std::move(sn), wrapped);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index cfd6838..99b5e5e 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -29,8 +29,8 @@
#include "connection/connection-pool.h"
#include "core/meta-utils.h"
-#include "core/table-name.h"
#include "core/region-location.h"
+#include "serde/table-name.h"
namespace hbase {
@@ -48,14 +48,14 @@ public:
// Meta Related Methods.
// These are only public until testing is complete
folly::Future<hbase::pb::ServerName> LocateMeta();
- folly::Future<RegionLocation> locateFromMeta(const hbase::pb::TableName &tn,
+ folly::Future<std::shared_ptr<RegionLocation>> LocateFromMeta(const hbase::pb::TableName &tn,
const std::string &row);
- RegionLocation parse_response(const Response &resp);
void InvalidateMeta();
private:
void RefreshMetaLocation();
hbase::pb::ServerName ReadMetaLocation();
+ std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
std::string quorum_spec_;
std::shared_ptr<folly::Executor> executor_;
@@ -64,7 +64,6 @@ private:
ConnectionPool cp_;
MetaUtil meta_util_;
-
// TODO: migrate this to a smart pointer with a deleter.
zhandle_t *zk_;
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/meta-utils.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index d2fdd88..1325d83 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -23,25 +23,26 @@
#include "connection/request.h"
#include "connection/response.h"
-#include "core/table-name.h"
#include "if/Client.pb.h"
+#include "serde/table-name.h"
using hbase::pb::TableName;
using hbase::MetaUtil;
using hbase::Request;
using hbase::Response;
using hbase::pb::ScanRequest;
+using hbase::pb::ServerName;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
static const std::string META_REGION = "1588230740";
-std::string MetaUtil::region_lookup_rowkey(const TableName &tn,
+std::string MetaUtil::RegionLookupRowkey(const TableName &tn,
const std::string &row) const {
return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
}
std::unique_ptr<Request>
-MetaUtil::make_meta_request(const TableName tn, const std::string &row) const {
+MetaUtil::MetaRequest(const TableName tn, const std::string &row) const {
auto request = Request::scan();
auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg());
@@ -76,6 +77,6 @@ MetaUtil::make_meta_request(const TableName tn, const std::string &row) const {
info_col->add_qualifier("server");
info_col->add_qualifier("regioninfo");
- scan->set_start_row(region_lookup_rowkey(tn, row));
+ scan->set_start_row(RegionLookupRowkey(tn, row));
return request;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/meta-utils.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index e007d02..5a659f3 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -21,15 +21,16 @@
#include <string>
#include "connection/Request.h"
-#include "core/table-name.h"
+#include "if/HBase.pb.h"
+#include "serde/table-name.h"
namespace hbase {
class MetaUtil {
public:
- std::string region_lookup_rowkey(const hbase::pb::TableName &tn,
+ std::string RegionLookupRowkey(const hbase::pb::TableName &tn,
const std::string &row) const;
- std::unique_ptr<Request> make_meta_request(const hbase::pb::TableName tn,
+ std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn,
const std::string &row) const;
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index a46b8e2..7922c95 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -23,7 +23,6 @@
#include "connection/service.h"
#include "if/HBase.pb.h"
-
namespace hbase {
class RegionLocation {
@@ -32,8 +31,8 @@ public:
std::shared_ptr<HBaseService> service)
: ri_(ri), sn_(sn), service_(service) {}
- const hbase::pb::RegionInfo& region_info() { return ri_; }
- const hbase::pb::ServerName& server_name() { return sn_; }
+ const hbase::pb::RegionInfo ®ion_info() { return ri_; }
+ const hbase::pb::ServerName &server_name() { return sn_; }
std::shared_ptr<HBaseService> service() { return service_; }
private:
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index ab614e4..00e3369 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -27,9 +27,9 @@
#include "connection/connection-pool.h"
#include "core/client.h"
-#include "core/table-name.h"
#include "if/Client.pb.h"
#include "if/ZooKeeper.pb.h"
+#include "serde/table-name.h"
using namespace folly;
using namespace std;
@@ -39,7 +39,7 @@ using hbase::Request;
using hbase::HBaseService;
using hbase::LocationCache;
using hbase::ConnectionPool;
-using hbase::TableNameUtil;
+using hbase::pb::TableName;
using hbase::pb::ServerName;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
using hbase::pb::GetRequest;
@@ -61,7 +61,7 @@ int main(int argc, char *argv[]) {
auto cpu_ex = wangle::getCPUExecutor();
LocationCache cache{FLAGS_zookeeper, cpu_ex};
auto result =
- cache.locateFromMeta(TableNameUtil::create(FLAGS_table), FLAGS_row)
+ cache.LocateFromMeta(folly::to<TableName>(FLAGS_table), FLAGS_row)
.get(milliseconds(5000));
return 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/table-name-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table-name-test.cc b/hbase-native-client/core/table-name-test.cc
deleted file mode 100644
index 7bad3f1..0000000
--- a/hbase-native-client/core/table-name-test.cc
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Conv.h>
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "core/table-name.h"
-
-using namespace hbase;
-using hbase::pb::TableName;
-
-TEST(TestTableName, TestToStringNoDefault) {
- TableName tn;
- tn.set_qualifier("TestTableName");
- std::string result = folly::to<std::string>(tn);
- ASSERT_EQ(result.find("default"), std::string::npos);
- ASSERT_EQ("TestTableName", result);
-}
-
-TEST(TestTableName, TestToStringNoDefaltWhenSet) {
- TableName tn;
- tn.set_namespace_("default");
- tn.set_qualifier("TestTableName");
- std::string result = folly::to<std::string>(tn);
- ASSERT_EQ(result.find("default"), std::string::npos);
- ASSERT_EQ("TestTableName", result);
-}
-
-TEST(TestTableName, TestToStringIncludeNS) {
- TableName tn;
- tn.set_namespace_("hbase");
- tn.set_qualifier("acl");
- std::string result = folly::to<std::string>(tn);
- ASSERT_EQ(result.find("hbase"), 0);
- ASSERT_EQ("hbase:acl", result);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/core/table-name.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/core/table-name.h
deleted file mode 100644
index 1612667..0000000
--- a/hbase-native-client/core/table-name.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <memory>
-#include <string>
-
-#include "if/HBase.pb.h"
-#include <folly/Conv.h>
-
-namespace hbase {
-namespace pb {
-
-// Provide folly::to<std::string>(TableName);
-template <class String> void toAppend(const TableName &in, String *result) {
- if (!in.has_namespace_() || in.namespace_() == "default") {
- folly::toAppend(in.qualifier(), result);
- } else {
- folly::toAppend(in.namespace_(), ':', in.qualifier(), result);
- }
-}
-
-} // namespace pb
-
-class TableNameUtil {
-public:
- static ::hbase::pb::TableName create(std::string table_name) {
- ::hbase::pb::TableName tn;
- tn.set_namespace_("default");
- tn.set_qualifier(table_name);
- return tn;
- }
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
index 539a221..db15026 100644
--- a/hbase-native-client/serde/BUCK
+++ b/hbase-native-client/serde/BUCK
@@ -17,49 +17,47 @@
cxx_library(name="serde",
exported_headers=[
- "client-serializer.h",
- "client-deserializer.h",
- "zk-deserializer.h",
+ "region-info.h",
+ "rpc.h",
+ "server-name.h",
+ "table-name.h",
+ "zk.h",
],
srcs=[
- "client-serializer.cc",
- "client-deserializer.cc",
- "zk-deserializer.cc",
+ "rpc.cc",
+ "zk.cc",
],
deps=[
"//if:if",
"//third-party:folly",
],
tests=[
- ":client-serializer-test",
":client-deserializer-test",
+ ":client-serializer-test",
+ ":server-name-test",
+ ":table-name-test",
+ ":zk-deserializer-test",
+ ":region-info-deserializer-test",
],
compiler_flags=['-Weffc++'],
visibility=[
'PUBLIC',
], )
-
+cxx_test(name="table-name-test",
+ srcs=["table-name-test.cc", ],
+ deps=[":serde", ], )
+cxx_test(name="server-name-test",
+ srcs=["server-name-test.cc", ],
+ deps=[":serde", ], )
cxx_test(name="client-serializer-test",
- srcs=[
- "client-serializer-test.cc",
- ],
- deps=[
- ":serde",
- "//if:if",
- ], )
+ srcs=["client-serializer-test.cc", ],
+ deps=[":serde", ], )
cxx_test(name="client-deserializer-test",
- srcs=[
- "client-deserializer-test.cc",
- ],
- deps=[
- ":serde",
- "//if:if",
- ], )
+ srcs=["client-deserializer-test.cc", ],
+ deps=[":serde", ], )
cxx_test(name="zk-deserializer-test",
- srcs=[
- "zk-deserializer-test.cc",
- ],
- deps=[
- ":serde",
- "//if:if",
- ], )
+ srcs=["zk-deserializer-test.cc", ],
+ deps=[":serde", ], )
+cxx_test(name="region-info-deserializer-test",
+ srcs=["region-info-deserializer-test.cc", ],
+ deps=[":serde", ], )
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/client-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index 9fef093..8c571b1 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -16,13 +16,12 @@
* limitations under the License.
*
*/
+#include "serde/rpc.h"
#include <folly/io/IOBuf.h>
#include <gtest/gtest.h>
#include "if/Client.pb.h"
-#include "serde/client-deserializer.h"
-#include "serde/client-serializer.h"
using namespace hbase;
using folly::IOBuf;
@@ -30,23 +29,23 @@ using hbase::pb::GetRequest;
using hbase::pb::RegionSpecifier;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
-TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) {
- ClientDeserializer deser;
- ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0);
+TEST(TestRpcSerde, TestReturnFalseOnNullPtr) {
+ RpcSerde deser;
+ ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0);
}
-TEST(TestClientDeserializer, TestReturnFalseOnBadInput) {
- ClientDeserializer deser;
+TEST(TestRpcSerde, TestReturnFalseOnBadInput) {
+ RpcSerde deser;
auto buf = IOBuf::copyBuffer("test");
GetRequest gr;
- ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0);
+ ASSERT_LT(deser.ParseDelimited(buf.get(), &gr), 0);
}
-TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) {
+TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) {
GetRequest in;
- ClientSerializer ser;
- ClientDeserializer deser;
+ RpcSerde ser;
+ RpcSerde deser;
// fill up the GetRequest.
in.mutable_region()->set_value("test_region_id");
@@ -56,11 +55,11 @@ TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) {
in.mutable_get()->set_row("test_row");
// Create the buffer
- auto buf = ser.serialize_delimited(in);
+ auto buf = ser.SerializeDelimited(in);
GetRequest out;
- int used_bytes = deser.parse_delimited(buf.get(), &out);
+ int used_bytes = deser.ParseDelimited(buf.get(), &out);
ASSERT_GT(used_bytes, 0);
ASSERT_EQ(used_bytes, buf->length());
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/client-deserializer.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer.cc b/hbase-native-client/serde/client-deserializer.cc
deleted file mode 100644
index acca7ea..0000000
--- a/hbase-native-client/serde/client-deserializer.cc
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/client-deserializer.h"
-
-#include <folly/Logging.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/message.h>
-
-using namespace hbase;
-
-using folly::IOBuf;
-using google::protobuf::Message;
-using google::protobuf::io::ArrayInputStream;
-using google::protobuf::io::CodedInputStream;
-
-int ClientDeserializer::parse_delimited(const IOBuf *buf, Message *msg) {
- if (buf == nullptr || msg == nullptr) {
- return -2;
- }
-
- DCHECK(!buf->isChained());
-
- ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
- CodedInputStream coded_stream{&ais};
-
- uint32_t msg_size;
-
- // Try and read the varint.
- if (coded_stream.ReadVarint32(&msg_size) == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
- return -3;
- }
-
- coded_stream.PushLimit(msg_size);
- // Parse the message.
- if (msg->MergeFromCodedStream(&coded_stream) == false) {
- FB_LOG_EVERY_MS(ERROR, 1000)
- << "Unable to read a protobuf message from data.";
- return -4;
- }
-
- // Make sure all the data was consumed.
- if (coded_stream.ConsumedEntireMessage() == false) {
- FB_LOG_EVERY_MS(ERROR, 1000)
- << "Orphaned data left after reading protobuf message";
- return -5;
- }
-
- return coded_stream.CurrentPosition();
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/client-deserializer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer.h b/hbase-native-client/serde/client-deserializer.h
deleted file mode 100644
index b9664b0..0000000
--- a/hbase-native-client/serde/client-deserializer.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/io/IOBuf.h>
-
-// Forward
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace hbase {
-class ClientDeserializer {
-public:
- int parse_delimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
-};
-
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/client-serializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 9bf38af..2bd17fb 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -24,16 +24,16 @@
#include "if/HBase.pb.h"
#include "if/RPC.pb.h"
-#include "serde/client-serializer.h"
+#include "serde/rpc.h"
using namespace hbase;
using namespace hbase::pb;
using namespace folly;
using namespace folly::io;
-TEST(ClientSerializerTest, PreambleIncludesHBas) {
- ClientSerializer ser;
- auto buf = ser.preamble();
+TEST(RpcSerdeTest, PreambleIncludesHBas) {
+ RpcSerde ser;
+ auto buf = ser.Preamble();
const char *p = reinterpret_cast<const char *>(buf->data());
// Take the first for chars and make sure they are the
// magic string
@@ -42,16 +42,16 @@ TEST(ClientSerializerTest, PreambleIncludesHBas) {
EXPECT_EQ(6, buf->computeChainDataLength());
}
-TEST(ClientSerializerTest, PreambleIncludesVersion) {
- ClientSerializer ser;
- auto buf = ser.preamble();
+TEST(RpcSerdeTest, PreambleIncludesVersion) {
+ RpcSerde ser;
+ auto buf = ser.Preamble();
EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]);
EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]);
}
-TEST(ClientSerializerTest, TestHeaderLengthPrefixed) {
- ClientSerializer ser;
- auto header = ser.header("elliott");
+TEST(RpcSerdeTest, TestHeaderLengthPrefixed) {
+ RpcSerde ser;
+ auto header = ser.Header("elliott");
// The header should be prefixed by 4 bytes of length.
EXPECT_EQ(4, header->length());
@@ -64,9 +64,9 @@ TEST(ClientSerializerTest, TestHeaderLengthPrefixed) {
EXPECT_EQ(prefixed_len, header->next()->length());
}
-TEST(ClientSerializerTest, TestHeaderDecode) {
- ClientSerializer ser;
- auto buf = ser.header("elliott");
+TEST(RpcSerdeTest, TestHeaderDecode) {
+ RpcSerde ser;
+ auto buf = ser.Header("elliott");
auto header_buf = buf->next();
ConnectionHeader h;
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/client-serializer.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer.cc b/hbase-native-client/serde/client-serializer.cc
deleted file mode 100644
index 09b81c8..0000000
--- a/hbase-native-client/serde/client-serializer.cc
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "serde/client-serializer.h"
-
-#include <folly/Logging.h>
-#include <folly/io/Cursor.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-
-#include "if/HBase.pb.h"
-#include "if/RPC.pb.h"
-
-using namespace hbase;
-
-using folly::IOBuf;
-using folly::io::RWPrivateCursor;
-using google::protobuf::Message;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedOutputStream;
-using google::protobuf::io::ZeroCopyOutputStream;
-using std::string;
-using std::unique_ptr;
-
-static const std::string PREAMBLE = "HBas";
-static const std::string INTERFACE = "ClientService";
-static const uint8_t RPC_VERSION = 0;
-static const uint8_t DEFAULT_AUTH_TYPE = 80;
-
-ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {}
-
-unique_ptr<IOBuf> ClientSerializer::preamble() {
- auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
- magic->append(2);
- RWPrivateCursor c(magic.get());
- c.skip(4);
- // Version
- c.write(RPC_VERSION);
- // Standard security aka Please don't lie to me.
- c.write(auth_type_);
- return magic;
-}
-
-unique_ptr<IOBuf> ClientSerializer::header(const string &user) {
- pb::ConnectionHeader h;
-
- // TODO(eclark): Make this not a total lie.
- h.mutable_user_info()->set_effective_user(user);
- // The service name that we want to talk to.
- //
- // Right now we're completely ignoring the service interface.
- // That may or may not be the correct thing to do.
- // It worked for a while with the java client; until it
- // didn't.
- h.set_service_name(INTERFACE);
- return prepend_length(serialize_message(h));
-}
-
-unique_ptr<IOBuf> ClientSerializer::request(const uint32_t call_id,
- const string &method,
- const Message *msg) {
- pb::RequestHeader rq;
- rq.set_method_name(method);
- rq.set_call_id(call_id);
- rq.set_request_param(msg != nullptr);
- auto ser_header = serialize_delimited(rq);
- if (msg != nullptr) {
- auto ser_req = serialize_delimited(*msg);
- ser_header->appendChain(std::move(ser_req));
- }
-
- return prepend_length(std::move(ser_header));
-}
-
-unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) {
- // Java ints are 4 long. So create a buffer that large
- auto len_buf = IOBuf::create(4);
- // Then make those bytes visible.
- len_buf->append(4);
-
- RWPrivateCursor c(len_buf.get());
- // Get the size of the data to be pushed out the network.
- auto size = msg->computeChainDataLength();
-
- // Write the length to this IOBuf.
- c.writeBE(static_cast<uint32_t>(size));
-
- // Then attach the origional to the back of len_buf
- len_buf->appendChain(std::move(msg));
- return len_buf;
-}
-
-unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) {
- // Get the buffer size needed for just the message.
- int msg_size = msg.ByteSize();
- int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
-
- // Create a buffer big enough to hold the varint and the object.
- auto buf = IOBuf::create(buf_size);
- buf->append(buf_size);
-
- // Create the array output stream.
- ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
- // Wrap the ArrayOuputStream in the coded output stream to allow writing
- // Varint32
- CodedOutputStream cos{&aos};
-
- // Write out the size.
- cos.WriteVarint32(msg_size);
-
- // Now write the rest out.
- // We're using the protobuf output streams here to keep track
- // of where in the output array we are rather than IOBuf.
- msg.SerializeWithCachedSizesToArray(
- cos.GetDirectBufferForNBytesAndAdvance(msg_size));
-
- // Return the buffer.
- return buf;
-}
-// TODO(eclark): Make this 1 copy.
-unique_ptr<IOBuf> ClientSerializer::serialize_message(const Message &msg) {
- auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
- return buf;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/client-serializer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer.h b/hbase-native-client/serde/client-serializer.h
deleted file mode 100644
index 9c819fe..0000000
--- a/hbase-native-client/serde/client-serializer.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <cstdint>
-#include <folly/io/IOBuf.h>
-#include <string>
-
-// Forward
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-namespace hbase {
-class Request;
-}
-
-namespace hbase {
-class ClientSerializer {
-public:
- ClientSerializer();
- std::unique_ptr<folly::IOBuf> preamble();
- std::unique_ptr<folly::IOBuf> header(const std::string &user);
- std::unique_ptr<folly::IOBuf> request(const uint32_t call_id,
- const std::string &method,
- const google::protobuf::Message *msg);
- std::unique_ptr<folly::IOBuf>
- serialize_delimited(const google::protobuf::Message &msg);
-
- std::unique_ptr<folly::IOBuf>
- serialize_message(const google::protobuf::Message &msg);
-
- std::unique_ptr<folly::IOBuf>
- prepend_length(std::unique_ptr<folly::IOBuf> msg);
-
- uint8_t auth_type_;
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/region-info-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc
new file mode 100644
index 0000000..ce8dedf
--- /dev/null
+++ b/hbase-native-client/serde/region-info-deserializer-test.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "serde/region-info.h"
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "if/HBase.pb.h"
+#include "serde/table-name.h"
+
+using std::string;
+using hbase::pb::RegionInfo;
+using hbase::pb::TableName;
+
+TEST(TestRegionInfoDesializer, TestDeserialize) {
+ string ns{"test_ns"};
+ string tn{"table_name"};
+ string start_row{"AAAAAA"};
+ string stop_row{"BBBBBBBBBBBB"};
+ uint64_t region_id = 2345678;
+
+ RegionInfo ri_out;
+ ri_out.set_region_id(region_id);
+ ri_out.mutable_table_name()->set_namespace_(ns);
+ ri_out.mutable_table_name()->set_qualifier(tn);
+ ri_out.set_start_key(start_row);
+ ri_out.set_end_key(stop_row);
+
+
+ string header{"PBUF"};
+ string ser = header + ri_out.SerializeAsString();
+
+ auto out = folly::to<RegionInfo>(ser);
+
+ EXPECT_EQ(region_id, out.region_id());
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/region-info.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h
new file mode 100644
index 0000000..6af351c
--- /dev/null
+++ b/hbase-native-client/serde/region-info.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include "if/HBase.pb.h"
+
+#include <folly/Conv.h>
+#include <boost/algorithm/string/predicate.hpp>
+
+namespace hbase {
+namespace pb {
+template <class String> void parseTo(String in, RegionInfo& out) {
+ // TODO(eclark): there has to be something better.
+ std::string s = folly::to<std::string>(in);
+
+ if (!boost::starts_with(s, "PBUF") ) {
+ throw std::runtime_error("Region Info field doesn't contain preamble");
+ }
+ if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) {
+ throw std::runtime_error("Bad protobuf for RegionInfo");
+ }
+}
+} // namespace pb
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/rpc.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
new file mode 100644
index 0000000..4c3c999
--- /dev/null
+++ b/hbase-native-client/serde/rpc.cc
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "serde/rpc.h"
+
+#include <folly/Logging.h>
+#include <folly/Logging.h>
+#include <folly/io/Cursor.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message.h>
+
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+
+using namespace hbase;
+
+using folly::IOBuf;
+using folly::io::RWPrivateCursor;
+using google::protobuf::Message;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+using google::protobuf::io::ZeroCopyOutputStream;
+using std::string;
+using std::unique_ptr;
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t DEFAULT_AUTH_TYPE = 80;
+
+int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
+ if (buf == nullptr || msg == nullptr) {
+ return -2;
+ }
+
+ DCHECK(!buf->isChained());
+
+ ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
+ CodedInputStream coded_stream{&ais};
+
+ uint32_t msg_size;
+
+ // Try and read the varint.
+ if (coded_stream.ReadVarint32(&msg_size) == false) {
+ FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
+ return -3;
+ }
+
+ coded_stream.PushLimit(msg_size);
+ // Parse the message.
+ if (msg->MergeFromCodedStream(&coded_stream) == false) {
+ FB_LOG_EVERY_MS(ERROR, 1000)
+ << "Unable to read a protobuf message from data.";
+ return -4;
+ }
+
+ // Make sure all the data was consumed.
+ if (coded_stream.ConsumedEntireMessage() == false) {
+ FB_LOG_EVERY_MS(ERROR, 1000)
+ << "Orphaned data left after reading protobuf message";
+ return -5;
+ }
+
+ return coded_stream.CurrentPosition();
+}
+
+RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {}
+RpcSerde::~RpcSerde() {}
+
+unique_ptr<IOBuf> RpcSerde::Preamble() {
+ auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
+ magic->append(2);
+ RWPrivateCursor c(magic.get());
+ c.skip(4);
+ // Version
+ c.write(RPC_VERSION);
+ // Standard security aka Please don't lie to me.
+ c.write(auth_type_);
+ return magic;
+}
+
+unique_ptr<IOBuf> RpcSerde::Header(const string &user) {
+ pb::ConnectionHeader h;
+
+ // TODO(eclark): Make this not a total lie.
+ h.mutable_user_info()->set_effective_user(user);
+ // The service name that we want to talk to.
+ //
+ // Right now we're completely ignoring the service interface.
+ // That may or may not be the correct thing to do.
+ // It worked for a while with the java client; until it
+ // didn't.
+ h.set_service_name(INTERFACE);
+ return PrependLength(SerializeMessage(h));
+}
+
+unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id,
+ const string &method, const Message *msg) {
+ pb::RequestHeader rq;
+ rq.set_method_name(method);
+ rq.set_call_id(call_id);
+ rq.set_request_param(msg != nullptr);
+ auto ser_header = SerializeDelimited(rq);
+ if (msg != nullptr) {
+ auto ser_req = SerializeDelimited(*msg);
+ ser_header->appendChain(std::move(ser_req));
+ }
+
+ return PrependLength(std::move(ser_header));
+}
+
+unique_ptr<IOBuf> RpcSerde::PrependLength(unique_ptr<IOBuf> msg) {
+ // Java ints are 4 long. So create a buffer that large
+ auto len_buf = IOBuf::create(4);
+ // Then make those bytes visible.
+ len_buf->append(4);
+
+ RWPrivateCursor c(len_buf.get());
+ // Get the size of the data to be pushed out the network.
+ auto size = msg->computeChainDataLength();
+
+ // Write the length to this IOBuf.
+ c.writeBE(static_cast<uint32_t>(size));
+
+ // Then attach the origional to the back of len_buf
+ len_buf->appendChain(std::move(msg));
+ return len_buf;
+}
+
+unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) {
+ // Get the buffer size needed for just the message.
+ int msg_size = msg.ByteSize();
+ int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
+
+ // Create a buffer big enough to hold the varint and the object.
+ auto buf = IOBuf::create(buf_size);
+ buf->append(buf_size);
+
+ // Create the array output stream.
+ ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
+ // Wrap the ArrayOuputStream in the coded output stream to allow writing
+ // Varint32
+ CodedOutputStream cos{&aos};
+
+ // Write out the size.
+ cos.WriteVarint32(msg_size);
+
+ // Now write the rest out.
+ // We're using the protobuf output streams here to keep track
+ // of where in the output array we are rather than IOBuf.
+ msg.SerializeWithCachedSizesToArray(
+ cos.GetDirectBufferForNBytesAndAdvance(msg_size));
+
+ // Return the buffer.
+ return buf;
+}
+// TODO(eclark): Make this 1 copy.
+unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) {
+ auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
+ return buf;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/rpc.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h
new file mode 100644
index 0000000..cefb583
--- /dev/null
+++ b/hbase-native-client/serde/rpc.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+// Forward
+namespace folly {
+class IOBuf;
+}
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace hbase {
+class RpcSerde {
+public:
+ RpcSerde();
+ virtual ~RpcSerde();
+ int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
+ std::unique_ptr<folly::IOBuf> Preamble();
+ std::unique_ptr<folly::IOBuf> Header(const std::string &user);
+ std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id,
+ const std::string &method,
+ const google::protobuf::Message *msg);
+ std::unique_ptr<folly::IOBuf>
+ SerializeDelimited(const google::protobuf::Message &msg);
+
+ std::unique_ptr<folly::IOBuf>
+ SerializeMessage(const google::protobuf::Message &msg);
+
+ std::unique_ptr<folly::IOBuf>
+ PrependLength(std::unique_ptr<folly::IOBuf> msg);
+
+private:
+ /* data */
+ uint8_t auth_type_;
+};
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/server-name-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/server-name-test.cc b/hbase-native-client/serde/server-name-test.cc
new file mode 100644
index 0000000..35dcbc1
--- /dev/null
+++ b/hbase-native-client/serde/server-name-test.cc
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "serde/server-name.h"
+
+#include <gtest/gtest.h>
+#include <string>
+
+using hbase::pb::ServerName;
+
+TEST(TestServerName, TestMakeServerName) {
+ auto sn = folly::to<ServerName>("test:123");
+
+ ASSERT_EQ("test", sn.host_name());
+ ASSERT_EQ(123, sn.port());
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/server-name.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/server-name.h b/hbase-native-client/serde/server-name.h
new file mode 100644
index 0000000..bdba087
--- /dev/null
+++ b/hbase-native-client/serde/server-name.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "if/HBase.pb.h"
+#include <folly/Conv.h>
+#include <folly/String.h>
+
+namespace hbase {
+namespace pb {
+
+template <class String> void parseTo(String in, ServerName &out) {
+ // TODO see about getting rsplit into folly.
+ std::string s = folly::to<std::string>(in);
+
+ auto delim = s.rfind(":");
+ DCHECK(delim != std::string::npos);
+ out.set_host_name(s.substr(0, delim));
+ // Now keep everything after the : (delim + 1) to the end.
+ out.set_port(folly::to<int>(s.substr(delim + 1)));
+}
+}
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/table-name-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/table-name-test.cc b/hbase-native-client/serde/table-name-test.cc
new file mode 100644
index 0000000..877d522
--- /dev/null
+++ b/hbase-native-client/serde/table-name-test.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "serde/table-name.h"
+
+using namespace hbase;
+using hbase::pb::TableName;
+
+TEST(TestTableName, TestToStringNoDefault) {
+ TableName tn;
+ tn.set_qualifier("TestTableName");
+ std::string result = folly::to<std::string>(tn);
+ ASSERT_EQ(result.find("default"), std::string::npos);
+ ASSERT_EQ("TestTableName", result);
+}
+
+TEST(TestTableName, TestToStringNoDefaltWhenSet) {
+ TableName tn;
+ tn.set_namespace_("default");
+ tn.set_qualifier("TestTableName");
+ std::string result = folly::to<std::string>(tn);
+ ASSERT_EQ(result.find("default"), std::string::npos);
+ ASSERT_EQ("TestTableName", result);
+}
+
+TEST(TestTableName, TestToStringIncludeNS) {
+ TableName tn;
+ tn.set_namespace_("hbase");
+ tn.set_qualifier("acl");
+ std::string result = folly::to<std::string>(tn);
+ ASSERT_EQ(result.find("hbase"), 0);
+ ASSERT_EQ("hbase:acl", result);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/table-name.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/table-name.h b/hbase-native-client/serde/table-name.h
new file mode 100644
index 0000000..c81e166
--- /dev/null
+++ b/hbase-native-client/serde/table-name.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "if/HBase.pb.h"
+#include <folly/Conv.h>
+#include <folly/String.h>
+
+namespace hbase {
+namespace pb {
+
+// Provide folly::to<std::string>(TableName);
+template <class String> void toAppend(const TableName &in, String *result) {
+ if (!in.has_namespace_() || in.namespace_() == "default") {
+ folly::toAppend(in.qualifier(), result);
+ } else {
+ folly::toAppend(in.namespace_(), ':', in.qualifier(), result);
+ }
+}
+
+template <class String> void parseTo(String in, TableName &out) {
+ std::vector<std::string> v;
+ folly::split(":", in, v);
+
+ if (v.size() == 1) {
+ out.set_namespace_("default");
+ out.set_qualifier(v[0]);
+ } else {
+ out.set_namespace_(v[0]);
+ out.set_qualifier(v[1]);
+ }
+}
+
+} // namespace pb
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/zk-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk-deserializer-test.cc b/hbase-native-client/serde/zk-deserializer-test.cc
index 92d85a0..f07eecf 100644
--- a/hbase-native-client/serde/zk-deserializer-test.cc
+++ b/hbase-native-client/serde/zk-deserializer-test.cc
@@ -17,7 +17,7 @@
*
*/
-#include "serde/zk-deserializer.h"
+#include "serde/zk.h"
#include <folly/Logging.h>
#include <folly/io/Cursor.h>
@@ -41,7 +41,7 @@ TEST(TestZkDesializer, TestThrowNoMagicNum) {
buf->append(100);
RWPrivateCursor c{buf.get()};
c.write<uint8_t>(99);
- ASSERT_THROW(deser.parse(buf.get(), &mrs), runtime_error);
+ ASSERT_THROW(deser.Parse(buf.get(), &mrs), runtime_error);
}
// Test if the protobuf is in a format that we can't decode
@@ -78,7 +78,7 @@ TEST(TestZkDesializer, TestBadProtoThrow) {
// Create the protobuf
MetaRegionServer out;
- ASSERT_THROW(deser.parse(buf.get(), &out), runtime_error);
+ ASSERT_THROW(deser.Parse(buf.get(), &out), runtime_error);
}
// Test to make sure the whole thing works.
@@ -118,6 +118,6 @@ TEST(TestZkDesializer, TestNoThrow) {
// Create the protobuf
MetaRegionServer out;
- ASSERT_TRUE(deser.parse(buf.get(), &out));
+ ASSERT_TRUE(deser.Parse(buf.get(), &out));
ASSERT_EQ(mrs.server().host_name(), out.server().host_name());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/zk-deserializer.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk-deserializer.cc b/hbase-native-client/serde/zk-deserializer.cc
deleted file mode 100644
index 33cf809..0000000
--- a/hbase-native-client/serde/zk-deserializer.cc
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/zk-deserializer.h"
-
-#include <folly/io/Cursor.h>
-#include <folly/io/IOBuf.h>
-#include <google/protobuf/message.h>
-
-using hbase::ZkDeserializer;
-using std::runtime_error;
-using folly::IOBuf;
-using folly::io::Cursor;
-using google::protobuf::Message;
-
-static const std::string MAGIC_STRING = "PBUF";
-
-bool ZkDeserializer::parse(IOBuf *buf, Message *out) {
-
- // The format is like this
- // 1 byte of magic number. 255
- // 4 bytes of id length.
- // id_length number of bytes for the id of who put up the znode
- // 4 bytes of a magic string PBUF
- // Then the protobuf serialized without a varint header.
-
- Cursor c{buf};
-
- // There should be a magic number for recoverable zk
- uint8_t magic_num = c.read<uint8_t>();
- if (magic_num != 255) {
- LOG(ERROR) << "Magic number not in ZK znode data expected 255 got ="
- << unsigned(magic_num);
- throw runtime_error("Magic number not in znode data");
- }
- // How long is the id?
- uint32_t id_len = c.readBE<uint32_t>();
-
- if (id_len >= c.length()) {
- LOG(ERROR) << "After skiping the if from zookeeper data there's not enough "
- "left to read anything else";
- throw runtime_error("Not enough bytes to decode from zookeeper");
- }
-
- // Skip the id
- c.skip(id_len);
-
- // Make sure that the magic string is there.
- if (MAGIC_STRING != c.readFixedString(4)) {
- LOG(ERROR) << "There was no PBUF magic string.";
- throw runtime_error("No PBUF magic string in the zookpeeper data.");
- }
-
- // Try to decode the protobuf.
- // If there's an error bail out.
- if (out->ParseFromArray(c.data(), c.length()) == false) {
- LOG(ERROR) << "Error parsing Protobuf Message";
- throw runtime_error("Error parsing protobuf");
- }
-
- return true;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/zk-deserializer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk-deserializer.h b/hbase-native-client/serde/zk-deserializer.h
deleted file mode 100644
index aa91661..0000000
--- a/hbase-native-client/serde/zk-deserializer.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-namespace folly {
-class IOBuf;
-}
-
-namespace hbase {
-class ZkDeserializer {
-public:
- bool parse(folly::IOBuf *buf, google::protobuf::Message *out);
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/zk.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk.cc b/hbase-native-client/serde/zk.cc
new file mode 100644
index 0000000..59871a5
--- /dev/null
+++ b/hbase-native-client/serde/zk.cc
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "serde/zk.h"
+
+#include <folly/io/Cursor.h>
+#include <folly/io/IOBuf.h>
+#include <google/protobuf/message.h>
+
+using hbase::ZkDeserializer;
+using std::runtime_error;
+using folly::IOBuf;
+using folly::io::Cursor;
+using google::protobuf::Message;
+
+static const std::string MAGIC_STRING = "PBUF";
+
+bool ZkDeserializer::Parse(IOBuf *buf, Message *out) {
+
+ // The format is like this
+ // 1 byte of magic number. 255
+ // 4 bytes of id length.
+ // id_length number of bytes for the id of who put up the znode
+ // 4 bytes of a magic string PBUF
+ // Then the protobuf serialized without a varint header.
+
+ Cursor c{buf};
+
+ // There should be a magic number for recoverable zk
+ uint8_t magic_num = c.read<uint8_t>();
+ if (magic_num != 255) {
+ LOG(ERROR) << "Magic number not in ZK znode data expected 255 got ="
+ << unsigned(magic_num);
+ throw runtime_error("Magic number not in znode data");
+ }
+ // How long is the id?
+ uint32_t id_len = c.readBE<uint32_t>();
+
+ if (id_len >= c.length()) {
+ LOG(ERROR) << "After skiping the if from zookeeper data there's not enough "
+ "left to read anything else";
+ throw runtime_error("Not enough bytes to decode from zookeeper");
+ }
+
+ // Skip the id
+ c.skip(id_len);
+
+ // Make sure that the magic string is there.
+ if (MAGIC_STRING != c.readFixedString(4)) {
+ LOG(ERROR) << "There was no PBUF magic string.";
+ throw runtime_error("No PBUF magic string in the zookpeeper data.");
+ }
+
+ // Try to decode the protobuf.
+ // If there's an error bail out.
+ if (out->ParseFromArray(c.data(), c.length()) == false) {
+ LOG(ERROR) << "Error parsing Protobuf Message";
+ throw runtime_error("Error parsing protobuf");
+ }
+
+ return true;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/190fc7aa/hbase-native-client/serde/zk.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk.h b/hbase-native-client/serde/zk.h
new file mode 100644
index 0000000..b672bf4
--- /dev/null
+++ b/hbase-native-client/serde/zk.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+namespace folly {
+class IOBuf;
+}
+
+namespace hbase {
+class ZkDeserializer {
+public:
+ bool Parse(folly::IOBuf *buf, google::protobuf::Message *out);
+};
+} // namespace hbase