You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/07/21 23:33:00 UTC
hbase git commit: HBASE-18338 [C++] Implement RpcTestServer (Xiaobing
Zhou)
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 a93c6a998 -> 1193812d7
HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1193812d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1193812d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1193812d
Branch: refs/heads/HBASE-14850
Commit: 1193812d784f407ab8596380e003b65de27a117a
Parents: a93c6a9
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Jul 21 16:29:44 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jul 21 16:29:44 2017 -0700
----------------------------------------------------------------------
hbase-native-client/connection/BUCK | 13 ++
.../connection/client-handler.cc | 21 +-
hbase-native-client/connection/client-handler.h | 7 +-
hbase-native-client/connection/pipeline.cc | 12 +-
.../connection/rpc-test-server-handler.cc | 77 ++++++
.../connection/rpc-test-server-handler.h | 47 ++++
.../connection/rpc-test-server.cc | 70 ++++++
.../connection/rpc-test-server.h | 50 ++++
hbase-native-client/connection/rpc-test.cc | 86 +++++++
hbase-native-client/connection/sasl-handler.h | 2 +-
hbase-native-client/if/test.proto | 43 ++++
hbase-native-client/if/test_rpc_service.proto | 35 +++
hbase-native-client/serde/BUCK | 4 +-
.../serde/client-deserializer-test.cc | 3 +-
.../serde/client-serializer-test.cc | 2 +-
hbase-native-client/serde/rpc-serde.cc | 234 +++++++++++++++++++
hbase-native-client/serde/rpc-serde.h | 141 +++++++++++
hbase-native-client/serde/rpc.cc | 222 ------------------
hbase-native-client/serde/rpc.h | 125 ----------
19 files changed, 827 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index c3119eb..aaf8fdb 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -33,6 +33,8 @@ cxx_library(
"service.h",
"rpc-client.h",
"sasl-util.h",
+ "rpc-test-server.h",
+ "rpc-test-server-handler.h",
],
srcs=[
"client-dispatcher.cc",
@@ -44,6 +46,8 @@ cxx_library(
"rpc-client.cc",
"sasl-handler.cc",
"sasl-util.cc",
+ "rpc-test-server.cc",
+ "rpc-test-server-handler.cc",
],
deps=[
"//if:if",
@@ -68,3 +72,12 @@ cxx_test(
deps=[
":connection",
],)
+cxx_test(
+ name="rpc-test",
+ srcs=[
+ "rpc-test.cc",
+ ],
+ deps=[
+ ":connection",
+ ],
+ run_test_separately=True,)
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 052c171..39227d3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -35,9 +35,10 @@ using google::protobuf::Message;
namespace hbase {
ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
- const std::string &server)
+ std::shared_ptr<Configuration> conf, const std::string &server)
: user_name_(user_name),
serde_(codec),
+ conf_(conf),
server_(server),
once_flag_(std::make_unique<std::once_flag>()),
resp_msgs_(
@@ -115,13 +116,17 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
}
folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
- // We need to send the header once.
- // So use call_once to make sure that only one thread wins this.
- std::call_once((*once_flag_), [ctx, this]() {
- VLOG(3) << "Writing RPC Header to server: " << server_;
- auto header = serde_.Header(user_name_);
- ctx->fireWrite(std::move(header));
- });
+ /* for RPC test, there's no need to send connection header */
+ if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+ RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+ // We need to send the header once.
+ // So use call_once to make sure that only one thread wins this.
+ std::call_once((*once_flag_), [ctx, this]() {
+ VLOG(3) << "Writing RPC Header to server: " << server_;
+ auto header = serde_.Header(user_name_);
+ ctx->fireWrite(std::move(header));
+ });
+ }
VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index 8de3a8b..b6f19a2 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -26,9 +26,10 @@
#include <string>
#include <utility>
+#include "core/configuration.h"
#include "exceptions/exception.h"
#include "serde/codec.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
#include "utils/concurrent-map.h"
// Forward decs.
@@ -60,7 +61,8 @@ class ClientHandler
* Create the handler
* @param user_name the user name of the user running this process.
*/
- ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string &server);
+ ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+ std::shared_ptr<Configuration> conf, const std::string &server);
/**
* Get bytes from the wire.
@@ -79,6 +81,7 @@ class ClientHandler
std::string user_name_;
RpcSerde serde_;
std::string server_; // for logging
+ std::shared_ptr<Configuration> conf_;
// in flight requests
std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/pipeline.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 2844752..9c790b6 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -32,7 +32,6 @@ namespace hbase {
RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec,
std::shared_ptr<Configuration> conf)
: user_util_(), codec_(codec), conf_(conf) {}
-
SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
std::shared_ptr<folly::AsyncTransportWrapper> sock) {
folly::SocketAddress addr; // for logging
@@ -41,10 +40,15 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
auto pipeline = SerializePipeline::create();
pipeline->addBack(wangle::AsyncSocketHandler{sock});
pipeline->addBack(wangle::EventBaseHandler{});
- auto secure = security::User::IsSecurityEnabled(*conf_);
- pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+ bool secure = false;
+ /* for RPC test, there's no need to setup Sasl */
+ if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+ RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+ secure = security::User::IsSecurityEnabled(*conf_);
+ pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+ }
pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{});
- pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, addr.describe()});
+ pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()});
pipeline->finalize();
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
new file mode 100644
index 0000000..7d2f407
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/rpc-test-server-handler.h"
+#include "if/RPC.pb.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) {
+ buf->coalesce();
+ pb::RequestHeader header;
+
+ int used_bytes = serde_.ParseDelimited(buf.get(), &header);
+ VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id();
+
+ auto received = CreateReceivedRequest(header.method_name());
+
+ buf->trimStart(used_bytes);
+ if (header.has_request_param() && received != nullptr) {
+ used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get());
+ VLOG(3) << "Read RPCRequest, buf length:" << buf->length()
+ << ", header PB length:" << used_bytes;
+ received->set_call_id(header.call_id());
+ }
+
+ if (received != nullptr) {
+ ctx->fireRead(std::move(received));
+ }
+}
+
+folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
+ std::unique_ptr<Response> r) {
+ VLOG(3) << "Writing RPC Request";
+ // Send the data down the pipeline.
+ return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get()));
+}
+
+std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
+ const std::string& method_name) {
+ std::unique_ptr<Request> result = nullptr;
+ ;
+ if (method_name == "ping") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
+ } else if (method_name == "echo") {
+ result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+ std::make_shared<EchoResponseProto>(), method_name);
+ } else if (method_name == "error") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
+ } else if (method_name == "pause") {
+ result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
+ } else if (method_name == "addr") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<AddrResponseProto>(), method_name);
+ }
+ return result;
+}
+} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
new file mode 100644
index 0000000..4c84615
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <wangle/channel/Handler.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "serde/rpc-serde.h"
+
+using namespace hbase;
+
+namespace hbase {
+// A real rpc server would probably use generated client/server stubs
+class RpcTestServerSerializeHandler
+ : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>,
+ std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>> {
+ public:
+ RpcTestServerSerializeHandler() : serde_() {}
+
+ void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
+
+ folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r) override;
+
+ private:
+ std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);
+
+ private:
+ hbase::RpcSerde serde_;
+};
+} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
new file mode 100644
index 0000000..d3a30b1
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
+#include <wangle/codec/LengthFieldPrepender.h>
+#include <wangle/service/ServerDispatcher.h>
+
+#include "connection/rpc-test-server-handler.h"
+#include "connection/rpc-test-server.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
+ std::shared_ptr<AsyncTransportWrapper> sock) {
+ auto pipeline = RpcTestServerSerializePipeline::create();
+ pipeline->addBack(AsyncSocketHandler(sock));
+ // ensure we can write from any thread
+ pipeline->addBack(EventBaseHandler());
+ pipeline->addBack(LengthFieldBasedFrameDecoder());
+ pipeline->addBack(RpcTestServerSerializeHandler());
+ pipeline->addBack(
+ MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_));
+ pipeline->finalize();
+
+ return pipeline;
+}
+
+Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) {
+ /* build Response */
+ auto response = std::make_unique<Response>();
+ response->set_call_id(request->call_id());
+ std::string method_name = request->method();
+
+ if (method_name == "ping") {
+ auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+ response->set_resp_msg(pb_resp_msg);
+ } else if (method_name == "echo") {
+ auto pb_resp_msg = std::make_shared<EchoResponseProto>();
+ auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+ pb_resp_msg->set_message(pb_req_msg->message());
+ response->set_resp_msg(pb_resp_msg);
+ } else if (method_name == "error") {
+ // TODO:
+ } else if (method_name == "pause") {
+ // TODO:
+ } else if (method_name == "addr") {
+ // TODO:
+ }
+
+ return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
+}
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
new file mode 100644
index 0000000..c3225ff
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/service/ExecutorFilter.h>
+#include <wangle/service/Service.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+
+namespace hbase {
+using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
+
+class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> {
+ public:
+ RpcTestService() {}
+ virtual ~RpcTestService() = default;
+ Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override;
+};
+
+class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> {
+ public:
+ RpcTestServerSerializePipeline::Ptr newPipeline(
+ std::shared_ptr<AsyncTransportWrapper> sock) override;
+
+ private:
+ ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{
+ std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()};
+};
+} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
new file mode 100644
index 0000000..d4cd89f
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <wangle/bootstrap/ClientBootstrap.h>
+#include <wangle/channel/Handler.h>
+
+#include <folly/Logging.h>
+#include <folly/SocketAddress.h>
+#include <folly/String.h>
+#include <folly/experimental/TestUtil.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <boost/thread.hpp>
+
+#include "connection/rpc-client.h"
+#include "if/test.pb.h"
+#include "rpc-test-server.h"
+#include "security/user.h"
+#include "serde/rpc-serde.h"
+
+using namespace wangle;
+using namespace folly;
+using namespace hbase;
+
+DEFINE_int32(port, 0, "test server port");
+
+TEST(RpcTestServer, echo) {
+ /* create conf */
+ auto conf = std::make_shared<Configuration>();
+ conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
+
+ /* create rpc test server */
+ auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>();
+ server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
+ server->bind(FLAGS_port);
+ folly::SocketAddress server_addr;
+ server->getSockets()[0]->getAddress(&server_addr);
+
+ /* create RpcClient */
+ auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+
+ auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
+
+ /**
+ * test echo
+ */
+ try {
+ std::string greetings = "hello, hbase server!";
+ auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+ std::make_shared<EchoResponseProto>(), "echo");
+ auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+ pb_msg->set_message(greetings);
+
+ /* sending out request */
+ rpc_client
+ ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request),
+ hbase::security::User::defaultUser())
+ .then([=](std::unique_ptr<Response> response) {
+ auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
+ VLOG(1) << "message returned: " + pb_resp->message();
+ EXPECT_EQ(greetings, pb_resp->message());
+ });
+ } catch (const std::exception& e) {
+ throw e;
+ }
+
+ server->stop();
+ server->join();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/sasl-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h
index f606a23..81f4e81 100644
--- a/hbase-native-client/connection/sasl-handler.h
+++ b/hbase-native-client/connection/sasl-handler.h
@@ -30,7 +30,7 @@
#include "connection/sasl-util.h"
#include "connection/service.h"
#include "security/user.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
namespace hbase {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/if/test.proto
----------------------------------------------------------------------
diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto
new file mode 100644
index 0000000..72b68e9
--- /dev/null
+++ b/hbase-native-client/if/test.proto
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestProtos";
+option java_generate_equals_and_hash = true;
+
+message EmptyRequestProto {
+}
+
+message EmptyResponseProto {
+}
+
+message EchoRequestProto {
+ required string message = 1;
+}
+
+message EchoResponseProto {
+ required string message = 1;
+}
+
+message PauseRequestProto {
+ required uint32 ms = 1;
+}
+
+message AddrResponseProto {
+ required string addr = 1;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/if/test_rpc_service.proto
----------------------------------------------------------------------
diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto
new file mode 100644
index 0000000..5f91dc4
--- /dev/null
+++ b/hbase-native-client/if/test_rpc_service.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestRpcServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "test.proto";
+
+
+/**
+ * A protobuf service for use in tests
+ */
+service TestProtobufRpcProto {
+ rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
+ rpc echo(EchoRequestProto) returns (EchoResponseProto);
+ rpc error(EmptyRequestProto) returns (EmptyResponseProto);
+ rpc pause(PauseRequestProto) returns (EmptyResponseProto);
+ rpc addr(EmptyRequestProto) returns (AddrResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
index 18e949c..a765884 100644
--- a/hbase-native-client/serde/BUCK
+++ b/hbase-native-client/serde/BUCK
@@ -22,13 +22,13 @@ cxx_library(
"cell-outputstream.h",
"codec.h",
"region-info.h",
- "rpc.h",
+ "rpc-serde.h",
"server-name.h",
"table-name.h",
"zk.h",
],
srcs=[
- "rpc.cc",
+ "rpc-serde.cc",
"zk.cc",
],
deps=[
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/client-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index 054684d..1856047 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -16,12 +16,11 @@
* limitations under the License.
*
*/
-#include "serde/rpc.h"
-
#include <folly/io/IOBuf.h>
#include <gtest/gtest.h>
#include "if/Client.pb.h"
+#include "rpc-serde.h"
using namespace hbase;
using folly::IOBuf;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/client-serializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 7d8b29c..306f2c2 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -24,7 +24,7 @@
#include "if/HBase.pb.h"
#include "if/RPC.pb.h"
-#include "serde/rpc.h"
+#include "rpc-serde.h"
using namespace hbase;
using namespace hbase::pb;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc-serde.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc
new file mode 100644
index 0000000..9e1f79a
--- /dev/null
+++ b/hbase-native-client/serde/rpc-serde.cc
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <folly/Logging.h>
+#include <folly/io/Cursor.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message.h>
+#include <boost/algorithm/string.hpp>
+
+#include <utility>
+
+#include "if/RPC.pb.h"
+#include "rpc-serde.h"
+#include "utils/version.h"
+
+using folly::IOBuf;
+using folly::io::RWPrivateCursor;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+using google::protobuf::io::ZeroCopyOutputStream;
+
+namespace hbase {
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t DEFAULT_AUTH_TYPE = 80;
+static const uint8_t KERBEROS_AUTH_TYPE = 81;
+
+int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
+ if (buf == nullptr || msg == nullptr) {
+ return -2;
+ }
+
+ DCHECK(!buf->isChained());
+
+ ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
+ CodedInputStream coded_stream{&ais};
+
+ uint32_t msg_size;
+
+ // Try and read the varint.
+ if (coded_stream.ReadVarint32(&msg_size) == false) {
+ FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
+ return -3;
+ }
+
+ coded_stream.PushLimit(msg_size);
+ // Parse the message.
+ if (msg->MergeFromCodedStream(&coded_stream) == false) {
+ FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data.";
+ return -4;
+ }
+
+ // Make sure all the data was consumed.
+ if (coded_stream.ConsumedEntireMessage() == false) {
+ FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message";
+ return -5;
+ }
+
+ return coded_stream.CurrentPosition();
+}
+
+RpcSerde::RpcSerde() {}
+
+RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
+
+std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
+ auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
+ magic->append(2);
+ RWPrivateCursor c(magic.get());
+ c.skip(4);
+ // Version
+ c.write(RPC_VERSION);
+ if (secure) {
+ // for now support only KERBEROS (DIGEST is not supported)
+ c.write(KERBEROS_AUTH_TYPE);
+ } else {
+ c.write(DEFAULT_AUTH_TYPE);
+ }
+ return magic;
+}
+
+std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) {
+ pb::ConnectionHeader h;
+
+ // TODO(eclark): Make this not a total lie.
+ h.mutable_user_info()->set_effective_user(user);
+ // The service name that we want to talk to.
+ //
+ // Right now we're completely ignoring the service interface.
+ // That may or may not be the correct thing to do.
+ // It worked for a while with the java client; until it
+ // didn't.
+ // TODO: send the service name and user from the RpcClient
+ h.set_service_name(INTERFACE);
+
+ std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo();
+
+ h.set_allocated_version_info(version_info.release());
+
+ if (codec_ != nullptr) {
+ h.set_cell_block_codec_class(codec_->java_class_name());
+ }
+ return PrependLength(SerializeMessage(h));
+}
+
+std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() {
+ std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>();
+ version_info->set_user(Version::user);
+ version_info->set_revision(Version::revision);
+ version_info->set_url(Version::url);
+ version_info->set_date(Version::date);
+ version_info->set_src_checksum(Version::src_checksum);
+ version_info->set_version(Version::version);
+
+ std::string version{Version::version};
+ std::vector<std::string> version_parts;
+ boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on);
+ uint32_t major_version = 0, minor_version = 0;
+ if (version_parts.size() >= 2) {
+ version_info->set_version_major(folly::to<uint32_t>(version_parts[0]));
+ version_info->set_version_minor(folly::to<uint32_t>(version_parts[1]));
+ }
+
+ VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString();
+ return version_info;
+}
+
+std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method,
+ const Message *msg) {
+ pb::RequestHeader rq;
+ rq.set_method_name(method);
+ rq.set_call_id(call_id);
+ rq.set_request_param(msg != nullptr);
+ auto ser_header = SerializeDelimited(rq);
+ if (msg != nullptr) {
+ auto ser_req = SerializeDelimited(*msg);
+ ser_header->appendChain(std::move(ser_req));
+ }
+
+ return PrependLength(std::move(ser_header));
+}
+
+std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
+ const google::protobuf::Message *msg) {
+ pb::ResponseHeader rh;
+ rh.set_call_id(call_id);
+ auto ser_header = SerializeDelimited(rh);
+ auto ser_resp = SerializeDelimited(*msg);
+ ser_header->appendChain(std::move(ser_resp));
+
+ return PrependLength(std::move(ser_header));
+}
+
+std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
+ uint32_t offset, uint32_t length) {
+ if (codec_ == nullptr) {
+ return nullptr;
+ }
+ return codec_->CreateDecoder(std::move(buf), offset, length);
+}
+
+std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) {
+ // Java ints are 4 long. So create a buffer that large
+ auto len_buf = IOBuf::create(4);
+ // Then make those bytes visible.
+ len_buf->append(4);
+
+ RWPrivateCursor c(len_buf.get());
+ // Get the size of the data to be pushed out the network.
+ auto size = msg->computeChainDataLength();
+
+ // Write the length to this IOBuf.
+ c.writeBE(static_cast<uint32_t>(size));
+
+ // Then attach the origional to the back of len_buf
+ len_buf->appendChain(std::move(msg));
+ return len_buf;
+}
+
+std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) {
+ // Get the buffer size needed for just the message.
+ int msg_size = msg.ByteSize();
+ int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
+
+ // Create a buffer big enough to hold the varint and the object.
+ auto buf = IOBuf::create(buf_size);
+ buf->append(buf_size);
+
+ // Create the array output stream.
+ ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
+ // Wrap the ArrayOuputStream in the coded output stream to allow writing
+ // Varint32
+ CodedOutputStream cos{&aos};
+
+ // Write out the size.
+ cos.WriteVarint32(msg_size);
+
+ // Now write the rest out.
+ // We're using the protobuf output streams here to keep track
+ // of where in the output array we are rather than IOBuf.
+ msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size));
+
+ // Return the buffer.
+ return buf;
+}
+// TODO(eclark): Make this 1 copy.
+std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) {
+ auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
+ return buf;
+}
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc-serde.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h
new file mode 100644
index 0000000..0e1d44e
--- /dev/null
+++ b/hbase-native-client/serde/rpc-serde.h
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "if/HBase.pb.h"
+#include "serde/cell-scanner.h"
+#include "serde/codec.h"
+
+// Forward
+namespace folly {
+class IOBuf;
+}
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace hbase {
+
+/**
+ * @brief Class for serializing a deserializing rpc formatted data.
+ *
+ * RpcSerde is the one stop shop for reading/writing data to HBase daemons.
+ * It should throw exceptions if anything goes wrong.
+ */
+class RpcSerde {
+ public:
+ RpcSerde();
+ /**
+ * Constructor assumes the default auth type.
+ */
+ RpcSerde(std::shared_ptr<Codec> codec);
+
+ /**
+ * Destructor. This is provided just for testing purposes.
+ */
+ virtual ~RpcSerde() = default;
+
+ /**
+ * Pase a message in the delimited format.
+ *
+ * A message in delimited format consists of the following:
+ *
+ * - a protobuf var int32.
+ * - A protobuf object serialized.
+ */
+ int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
+
+ /**
+ * Create a new connection preamble in a new IOBuf.
+ */
+ static std::unique_ptr<folly::IOBuf> Preamble(bool secure);
+
+ /**
+ * Create the header protobuf object and serialize it to a new IOBuf.
+ * Header is in the following format:
+ *
+ * - Big endian length
+ * - ConnectionHeader object serialized out.
+ */
+ std::unique_ptr<folly::IOBuf> Header(const std::string &user);
+
+ /**
+ * Take ownership of the passed buffer, and create a CellScanner using the
+ * Codec class to parse Cells out of the wire.
+ */
+ std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
+ uint32_t length);
+
+ /**
+ * Serialize a request message into a protobuf.
+ * Request consists of:
+ *
+ * - Big endian length
+ * - RequestHeader object
+ * - The passed in Message object
+ */
+ std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method,
+ const google::protobuf::Message *msg);
+
+ /**
+ * Serialize a response message into a protobuf.
+ * Request consists of:
+ *
+ * - Big endian length
+ * - ResponseHeader object
+ * - The passed in Message object
+ */
+ std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
+ const google::protobuf::Message *msg);
+
+ /**
+ * Serialize a message in the delimited format.
+ * Delimited format consists of the following:
+ *
+ * - A protobuf var int32
+ * - The message object seriailized after that.
+ */
+ std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg);
+
+ /**
+ * Serilalize a message. This does not add any length prepend.
+ */
+ std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg);
+
+ /**
+ * Prepend a length IOBuf to the given IOBuf chain.
+ * This involves no copies or moves of the passed in data.
+ */
+ std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
+
+ public:
+ static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode";
+ static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false;
+
+ private:
+ /* data */
+ std::shared_ptr<Codec> codec_;
+ std::unique_ptr<pb::VersionInfo> CreateVersionInfo();
+};
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
deleted file mode 100644
index 957a317..0000000
--- a/hbase-native-client/serde/rpc.cc
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/rpc.h"
-
-#include <folly/Conv.h>
-#include <folly/Logging.h>
-#include <folly/io/Cursor.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/message.h>
-#include <boost/algorithm/string.hpp>
-
-#include <utility>
-
-#include "if/RPC.pb.h"
-#include "utils/version.h"
-
-using folly::IOBuf;
-using folly::io::RWPrivateCursor;
-using google::protobuf::Message;
-using google::protobuf::io::ArrayInputStream;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-using google::protobuf::io::ZeroCopyOutputStream;
-
-namespace hbase {
-
-static const std::string PREAMBLE = "HBas";
-static const std::string INTERFACE = "ClientService";
-static const uint8_t RPC_VERSION = 0;
-static const uint8_t DEFAULT_AUTH_TYPE = 80;
-static const uint8_t KERBEROS_AUTH_TYPE = 81;
-
-int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
- if (buf == nullptr || msg == nullptr) {
- return -2;
- }
-
- DCHECK(!buf->isChained());
-
- ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
- CodedInputStream coded_stream{&ais};
-
- uint32_t msg_size;
-
- // Try and read the varint.
- if (coded_stream.ReadVarint32(&msg_size) == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
- return -3;
- }
-
- coded_stream.PushLimit(msg_size);
- // Parse the message.
- if (msg->MergeFromCodedStream(&coded_stream) == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data.";
- return -4;
- }
-
- // Make sure all the data was consumed.
- if (coded_stream.ConsumedEntireMessage() == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message";
- return -5;
- }
-
- return coded_stream.CurrentPosition();
-}
-
-RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
-
-std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
- auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
- magic->append(2);
- RWPrivateCursor c(magic.get());
- c.skip(4);
- // Version
- c.write(RPC_VERSION);
- if (secure) {
- // for now support only KERBEROS (DIGEST is not supported)
- c.write(KERBEROS_AUTH_TYPE);
- } else {
- c.write(DEFAULT_AUTH_TYPE);
- }
- return magic;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) {
- pb::ConnectionHeader h;
-
- // TODO(eclark): Make this not a total lie.
- h.mutable_user_info()->set_effective_user(user);
- // The service name that we want to talk to.
- //
- // Right now we're completely ignoring the service interface.
- // That may or may not be the correct thing to do.
- // It worked for a while with the java client; until it
- // didn't.
- // TODO: send the service name and user from the RpcClient
- h.set_service_name(INTERFACE);
-
- std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo();
-
- h.set_allocated_version_info(version_info.release());
-
- if (codec_ != nullptr) {
- h.set_cell_block_codec_class(codec_->java_class_name());
- }
- return PrependLength(SerializeMessage(h));
-}
-
-std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() {
- std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>();
- version_info->set_user(Version::user);
- version_info->set_revision(Version::revision);
- version_info->set_url(Version::url);
- version_info->set_date(Version::date);
- version_info->set_src_checksum(Version::src_checksum);
- version_info->set_version(Version::version);
-
- std::string version{Version::version};
- std::vector<std::string> version_parts;
- boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on);
- uint32_t major_version = 0, minor_version = 0;
- if (version_parts.size() >= 2) {
- version_info->set_version_major(folly::to<uint32_t>(version_parts[0]));
- version_info->set_version_minor(folly::to<uint32_t>(version_parts[1]));
- }
-
- VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString();
- return version_info;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method,
- const Message *msg) {
- pb::RequestHeader rq;
- rq.set_method_name(method);
- rq.set_call_id(call_id);
- rq.set_request_param(msg != nullptr);
- auto ser_header = SerializeDelimited(rq);
- if (msg != nullptr) {
- auto ser_req = SerializeDelimited(*msg);
- ser_header->appendChain(std::move(ser_req));
- }
-
- return PrependLength(std::move(ser_header));
-}
-
-std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
- uint32_t offset, uint32_t length) {
- if (codec_ == nullptr) {
- return nullptr;
- }
- return codec_->CreateDecoder(std::move(buf), offset, length);
-}
-
-std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) {
- // Java ints are 4 long. So create a buffer that large
- auto len_buf = IOBuf::create(4);
- // Then make those bytes visible.
- len_buf->append(4);
-
- RWPrivateCursor c(len_buf.get());
- // Get the size of the data to be pushed out the network.
- auto size = msg->computeChainDataLength();
-
- // Write the length to this IOBuf.
- c.writeBE(static_cast<uint32_t>(size));
-
- // Then attach the origional to the back of len_buf
- len_buf->appendChain(std::move(msg));
- return len_buf;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) {
- // Get the buffer size needed for just the message.
- int msg_size = msg.ByteSize();
- int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
-
- // Create a buffer big enough to hold the varint and the object.
- auto buf = IOBuf::create(buf_size);
- buf->append(buf_size);
-
- // Create the array output stream.
- ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
- // Wrap the ArrayOuputStream in the coded output stream to allow writing
- // Varint32
- CodedOutputStream cos{&aos};
-
- // Write out the size.
- cos.WriteVarint32(msg_size);
-
- // Now write the rest out.
- // We're using the protobuf output streams here to keep track
- // of where in the output array we are rather than IOBuf.
- msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size));
-
- // Return the buffer.
- return buf;
-}
-// TODO(eclark): Make this 1 copy.
-std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) {
- auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
- return buf;
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h
deleted file mode 100644
index 15aa1ee..0000000
--- a/hbase-native-client/serde/rpc.h
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <memory>
-#include <string>
-
-#include "if/HBase.pb.h"
-#include "serde/cell-scanner.h"
-#include "serde/codec.h"
-
-// Forward
-namespace folly {
-class IOBuf;
-}
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace hbase {
-
-/**
- * @brief Class for serializing a deserializing rpc formatted data.
- *
- * RpcSerde is the one stop shop for reading/writing data to HBase daemons.
- * It should throw exceptions if anything goes wrong.
- */
-class RpcSerde {
- public:
- /**
- * Constructor assumes the default auth type.
- */
- RpcSerde(std::shared_ptr<Codec> codec);
-
- /**
- * Destructor. This is provided just for testing purposes.
- */
- virtual ~RpcSerde() = default;
-
- /**
- * Pase a message in the delimited format.
- *
- * A message in delimited format consists of the following:
- *
- * - a protobuf var int32.
- * - A protobuf object serialized.
- */
- int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
-
- /**
- * Create a new connection preamble in a new IOBuf.
- */
- static std::unique_ptr<folly::IOBuf> Preamble(bool secure);
-
- /**
- * Create the header protobuf object and serialize it to a new IOBuf.
- * Header is in the following format:
- *
- * - Big endian length
- * - ConnectionHeader object serialized out.
- */
- std::unique_ptr<folly::IOBuf> Header(const std::string &user);
-
- /**
- * Take ownership of the passed buffer, and create a CellScanner using the
- * Codec class to parse Cells out of the wire.
- */
- std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
- uint32_t length);
-
- /**
- * Serialize a request message into a protobuf.
- * Request consists of:
- *
- * - Big endian length
- * - RequestHeader object
- * - The passed in Message object
- */
- std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method,
- const google::protobuf::Message *msg);
-
- /**
- * Serialize a message in the delimited format.
- * Delimited format consists of the following:
- *
- * - A protobuf var int32
- * - The message object seriailized after that.
- */
- std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg);
-
- /**
- * Serilalize a message. This does not add any length prepend.
- */
- std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg);
-
- /**
- * Prepend a length IOBuf to the given IOBuf chain.
- * This involves no copies or moves of the passed in data.
- */
- std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
-
- private:
- /* data */
- std::shared_ptr<Codec> codec_;
- std::unique_ptr<pb::VersionInfo> CreateVersionInfo();
-};
-} // namespace hbase