You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:31 UTC
[hbase] 103/133: HBASE-18338 [C++] Implement RpcTestServer
(Xiaobing Zhou)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit da28b3e214fbb867d7ad3202632d5d3528ca1bed
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Jul 21 16:29:44 2017 -0700
HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou)
---
hbase-native-client/connection/BUCK | 13 ++++
hbase-native-client/connection/client-handler.cc | 21 ++++--
hbase-native-client/connection/client-handler.h | 7 +-
hbase-native-client/connection/pipeline.cc | 12 ++-
.../connection/rpc-test-server-handler.cc | 77 +++++++++++++++++++
.../connection/rpc-test-server-handler.h | 47 ++++++++++++
hbase-native-client/connection/rpc-test-server.cc | 70 ++++++++++++++++++
hbase-native-client/connection/rpc-test-server.h | 50 +++++++++++++
hbase-native-client/connection/rpc-test.cc | 86 ++++++++++++++++++++++
hbase-native-client/connection/sasl-handler.h | 2 +-
hbase-native-client/if/test.proto | 43 +++++++++++
hbase-native-client/if/test_rpc_service.proto | 35 +++++++++
hbase-native-client/serde/BUCK | 4 +-
.../serde/client-deserializer-test.cc | 3 +-
.../serde/client-serializer-test.cc | 2 +-
hbase-native-client/serde/{rpc.cc => rpc-serde.cc} | 16 +++-
hbase-native-client/serde/{rpc.h => rpc-serde.h} | 16 ++++
17 files changed, 482 insertions(+), 22 deletions(-)
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index c3119eb..aaf8fdb 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -33,6 +33,8 @@ cxx_library(
"service.h",
"rpc-client.h",
"sasl-util.h",
+ "rpc-test-server.h",
+ "rpc-test-server-handler.h",
],
srcs=[
"client-dispatcher.cc",
@@ -44,6 +46,8 @@ cxx_library(
"rpc-client.cc",
"sasl-handler.cc",
"sasl-util.cc",
+ "rpc-test-server.cc",
+ "rpc-test-server-handler.cc",
],
deps=[
"//if:if",
@@ -68,3 +72,12 @@ cxx_test(
deps=[
":connection",
],)
+cxx_test(
+ name="rpc-test",
+ srcs=[
+ "rpc-test.cc",
+ ],
+ deps=[
+ ":connection",
+ ],
+ run_test_separately=True,)
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 052c171..39227d3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -35,9 +35,10 @@ using google::protobuf::Message;
namespace hbase {
ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
- const std::string &server)
+ std::shared_ptr<Configuration> conf, const std::string &server)
: user_name_(user_name),
serde_(codec),
+ conf_(conf),
server_(server),
once_flag_(std::make_unique<std::once_flag>()),
resp_msgs_(
@@ -115,13 +116,17 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
}
folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
- // We need to send the header once.
- // So use call_once to make sure that only one thread wins this.
- std::call_once((*once_flag_), [ctx, this]() {
- VLOG(3) << "Writing RPC Header to server: " << server_;
- auto header = serde_.Header(user_name_);
- ctx->fireWrite(std::move(header));
- });
+ /* for RPC test, there's no need to send connection header */
+ if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+ RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+ // We need to send the header once.
+ // So use call_once to make sure that only one thread wins this.
+ std::call_once((*once_flag_), [ctx, this]() {
+ VLOG(3) << "Writing RPC Header to server: " << server_;
+ auto header = serde_.Header(user_name_);
+ ctx->fireWrite(std::move(header));
+ });
+ }
VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_;
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index 8de3a8b..b6f19a2 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -26,9 +26,10 @@
#include <string>
#include <utility>
+#include "core/configuration.h"
#include "exceptions/exception.h"
#include "serde/codec.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
#include "utils/concurrent-map.h"
// Forward decs.
@@ -60,7 +61,8 @@ class ClientHandler
* Create the handler
* @param user_name the user name of the user running this process.
*/
- ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string &server);
+ ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+ std::shared_ptr<Configuration> conf, const std::string &server);
/**
* Get bytes from the wire.
@@ -79,6 +81,7 @@ class ClientHandler
std::string user_name_;
RpcSerde serde_;
std::string server_; // for logging
+ std::shared_ptr<Configuration> conf_;
// in flight requests
std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_;
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 2844752..9c790b6 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -32,7 +32,6 @@ namespace hbase {
RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec,
std::shared_ptr<Configuration> conf)
: user_util_(), codec_(codec), conf_(conf) {}
-
SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
std::shared_ptr<folly::AsyncTransportWrapper> sock) {
folly::SocketAddress addr; // for logging
@@ -41,10 +40,15 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
auto pipeline = SerializePipeline::create();
pipeline->addBack(wangle::AsyncSocketHandler{sock});
pipeline->addBack(wangle::EventBaseHandler{});
- auto secure = security::User::IsSecurityEnabled(*conf_);
- pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+ bool secure = false;
+ /* for RPC test, there's no need to setup Sasl */
+ if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+ RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+ secure = security::User::IsSecurityEnabled(*conf_);
+ pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+ }
pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{});
- pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, addr.describe()});
+ pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()});
pipeline->finalize();
return pipeline;
}
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
new file mode 100644
index 0000000..7d2f407
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/rpc-test-server-handler.h"
+#include "if/RPC.pb.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) {
+ buf->coalesce();
+ pb::RequestHeader header;
+
+ int used_bytes = serde_.ParseDelimited(buf.get(), &header);
+ VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id();
+
+ auto received = CreateReceivedRequest(header.method_name());
+
+ buf->trimStart(used_bytes);
+ if (header.has_request_param() && received != nullptr) {
+ used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get());
+ VLOG(3) << "Read RPCRequest, buf length:" << buf->length()
+ << ", header PB length:" << used_bytes;
+ received->set_call_id(header.call_id());
+ }
+
+ if (received != nullptr) {
+ ctx->fireRead(std::move(received));
+ }
+}
+
+folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
+ std::unique_ptr<Response> r) {
+ VLOG(3) << "Writing RPC Request";
+ // Send the data down the pipeline.
+ return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get()));
+}
+
+std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
+ const std::string& method_name) {
+ std::unique_ptr<Request> result = nullptr;
+ ;
+ if (method_name == "ping") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
+ } else if (method_name == "echo") {
+ result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+ std::make_shared<EchoResponseProto>(), method_name);
+ } else if (method_name == "error") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
+ } else if (method_name == "pause") {
+ result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
+ } else if (method_name == "addr") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<AddrResponseProto>(), method_name);
+ }
+ return result;
+}
+} // end of namespace hbase
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
new file mode 100644
index 0000000..4c84615
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <wangle/channel/Handler.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "serde/rpc-serde.h"
+
+using namespace hbase;
+
+namespace hbase {
+// A real rpc server would probably use generated client/server stubs
+class RpcTestServerSerializeHandler
+ : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>,
+ std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>> {
+ public:
+ RpcTestServerSerializeHandler() : serde_() {}
+
+ void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
+
+ folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r) override;
+
+ private:
+ std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);
+
+ private:
+ hbase::RpcSerde serde_;
+};
+} // end of namespace hbase
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
new file mode 100644
index 0000000..d3a30b1
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
+#include <wangle/codec/LengthFieldPrepender.h>
+#include <wangle/service/ServerDispatcher.h>
+
+#include "connection/rpc-test-server-handler.h"
+#include "connection/rpc-test-server.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
+ std::shared_ptr<AsyncTransportWrapper> sock) {
+ auto pipeline = RpcTestServerSerializePipeline::create();
+ pipeline->addBack(AsyncSocketHandler(sock));
+ // ensure we can write from any thread
+ pipeline->addBack(EventBaseHandler());
+ pipeline->addBack(LengthFieldBasedFrameDecoder());
+ pipeline->addBack(RpcTestServerSerializeHandler());
+ pipeline->addBack(
+ MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_));
+ pipeline->finalize();
+
+ return pipeline;
+}
+
+Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) {
+ /* build Response */
+ auto response = std::make_unique<Response>();
+ response->set_call_id(request->call_id());
+ std::string method_name = request->method();
+
+ if (method_name == "ping") {
+ auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+ response->set_resp_msg(pb_resp_msg);
+ } else if (method_name == "echo") {
+ auto pb_resp_msg = std::make_shared<EchoResponseProto>();
+ auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+ pb_resp_msg->set_message(pb_req_msg->message());
+ response->set_resp_msg(pb_resp_msg);
+ } else if (method_name == "error") {
+ // TODO:
+ } else if (method_name == "pause") {
+ // TODO:
+ } else if (method_name == "addr") {
+ // TODO:
+ }
+
+ return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
+}
+} // namespace hbase
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
new file mode 100644
index 0000000..c3225ff
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/service/ExecutorFilter.h>
+#include <wangle/service/Service.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+
+namespace hbase {
+using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
+
+class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> {
+ public:
+ RpcTestService() {}
+ virtual ~RpcTestService() = default;
+ Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override;
+};
+
+class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> {
+ public:
+ RpcTestServerSerializePipeline::Ptr newPipeline(
+ std::shared_ptr<AsyncTransportWrapper> sock) override;
+
+ private:
+ ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{
+ std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()};
+};
+} // end of namespace hbase
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
new file mode 100644
index 0000000..d4cd89f
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <wangle/bootstrap/ClientBootstrap.h>
+#include <wangle/channel/Handler.h>
+
+#include <folly/Logging.h>
+#include <folly/SocketAddress.h>
+#include <folly/String.h>
+#include <folly/experimental/TestUtil.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <boost/thread.hpp>
+
+#include "connection/rpc-client.h"
+#include "if/test.pb.h"
+#include "rpc-test-server.h"
+#include "security/user.h"
+#include "serde/rpc-serde.h"
+
+using namespace wangle;
+using namespace folly;
+using namespace hbase;
+
+DEFINE_int32(port, 0, "test server port");
+
+TEST(RpcTestServer, echo) {
+ /* create conf */
+ auto conf = std::make_shared<Configuration>();
+ conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
+
+ /* create rpc test server */
+ auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>();
+ server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
+ server->bind(FLAGS_port);
+ folly::SocketAddress server_addr;
+ server->getSockets()[0]->getAddress(&server_addr);
+
+ /* create RpcClient */
+ auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+
+ auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
+
+ /**
+ * test echo
+ */
+ try {
+ std::string greetings = "hello, hbase server!";
+ auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+ std::make_shared<EchoResponseProto>(), "echo");
+ auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+ pb_msg->set_message(greetings);
+
+ /* sending out request */
+ rpc_client
+ ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request),
+ hbase::security::User::defaultUser())
+ .then([=](std::unique_ptr<Response> response) {
+ auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
+ VLOG(1) << "message returned: " + pb_resp->message();
+ EXPECT_EQ(greetings, pb_resp->message());
+ });
+ } catch (const std::exception& e) {
+ throw e;
+ }
+
+ server->stop();
+ server->join();
+}
diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h
index f606a23..81f4e81 100644
--- a/hbase-native-client/connection/sasl-handler.h
+++ b/hbase-native-client/connection/sasl-handler.h
@@ -30,7 +30,7 @@
#include "connection/sasl-util.h"
#include "connection/service.h"
#include "security/user.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
namespace hbase {
diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto
new file mode 100644
index 0000000..72b68e9
--- /dev/null
+++ b/hbase-native-client/if/test.proto
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestProtos";
+option java_generate_equals_and_hash = true;
+
+message EmptyRequestProto {
+}
+
+message EmptyResponseProto {
+}
+
+message EchoRequestProto {
+ required string message = 1;
+}
+
+message EchoResponseProto {
+ required string message = 1;
+}
+
+message PauseRequestProto {
+ required uint32 ms = 1;
+}
+
+message AddrResponseProto {
+ required string addr = 1;
+}
diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto
new file mode 100644
index 0000000..5f91dc4
--- /dev/null
+++ b/hbase-native-client/if/test_rpc_service.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestRpcServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "test.proto";
+
+
+/**
+ * A protobuf service for use in tests
+ */
+service TestProtobufRpcProto {
+ rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
+ rpc echo(EchoRequestProto) returns (EchoResponseProto);
+ rpc error(EmptyRequestProto) returns (EmptyResponseProto);
+ rpc pause(PauseRequestProto) returns (EmptyResponseProto);
+ rpc addr(EmptyRequestProto) returns (AddrResponseProto);
+}
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
index 18e949c..a765884 100644
--- a/hbase-native-client/serde/BUCK
+++ b/hbase-native-client/serde/BUCK
@@ -22,13 +22,13 @@ cxx_library(
"cell-outputstream.h",
"codec.h",
"region-info.h",
- "rpc.h",
+ "rpc-serde.h",
"server-name.h",
"table-name.h",
"zk.h",
],
srcs=[
- "rpc.cc",
+ "rpc-serde.cc",
"zk.cc",
],
deps=[
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index 054684d..1856047 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -16,12 +16,11 @@
* limitations under the License.
*
*/
-#include "serde/rpc.h"
-
#include <folly/io/IOBuf.h>
#include <gtest/gtest.h>
#include "if/Client.pb.h"
+#include "rpc-serde.h"
using namespace hbase;
using folly::IOBuf;
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 7d8b29c..306f2c2 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -24,7 +24,7 @@
#include "if/HBase.pb.h"
#include "if/RPC.pb.h"
-#include "serde/rpc.h"
+#include "rpc-serde.h"
using namespace hbase;
using namespace hbase::pb;
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc-serde.cc
similarity index 94%
rename from hbase-native-client/serde/rpc.cc
rename to hbase-native-client/serde/rpc-serde.cc
index 957a317..9e1f79a 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc-serde.cc
@@ -17,8 +17,6 @@
*
*/
-#include "serde/rpc.h"
-
#include <folly/Conv.h>
#include <folly/Logging.h>
#include <folly/io/Cursor.h>
@@ -30,6 +28,7 @@
#include <utility>
#include "if/RPC.pb.h"
+#include "rpc-serde.h"
#include "utils/version.h"
using folly::IOBuf;
@@ -83,6 +82,8 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
return coded_stream.CurrentPosition();
}
+RpcSerde::RpcSerde() {}
+
RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
@@ -162,6 +163,17 @@ std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::stri
return PrependLength(std::move(ser_header));
}
+std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
+ const google::protobuf::Message *msg) {
+ pb::ResponseHeader rh;
+ rh.set_call_id(call_id);
+ auto ser_header = SerializeDelimited(rh);
+ auto ser_resp = SerializeDelimited(*msg);
+ ser_header->appendChain(std::move(ser_resp));
+
+ return PrependLength(std::move(ser_header));
+}
+
std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
uint32_t offset, uint32_t length) {
if (codec_ == nullptr) {
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc-serde.h
similarity index 87%
rename from hbase-native-client/serde/rpc.h
rename to hbase-native-client/serde/rpc-serde.h
index 15aa1ee..0e1d44e 100644
--- a/hbase-native-client/serde/rpc.h
+++ b/hbase-native-client/serde/rpc-serde.h
@@ -45,6 +45,7 @@ namespace hbase {
*/
class RpcSerde {
public:
+ RpcSerde();
/**
* Constructor assumes the default auth type.
*/
@@ -98,6 +99,17 @@ class RpcSerde {
const google::protobuf::Message *msg);
/**
+ * Serialize a response message into a protobuf.
+ * Request consists of:
+ *
+ * - Big endian length
+ * - ResponseHeader object
+ * - The passed in Message object
+ */
+ std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
+ const google::protobuf::Message *msg);
+
+ /**
* Serialize a message in the delimited format.
* Delimited format consists of the following:
*
@@ -117,6 +129,10 @@ class RpcSerde {
*/
std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
+ public:
+ static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode";
+ static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false;
+
private:
/* data */
std::shared_ptr<Codec> codec_;