You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/08/01 19:04:18 UTC
hbase git commit: HBASE-18466. [C++] Support handling exception in
RpcTestServer
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 d6c720b8f -> f602e8a77
HBASE-18466. [C++] Support handling exception in RpcTestServer
Signed-off-by: Enis Soztutar <en...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f602e8a7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f602e8a7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f602e8a7
Branch: refs/heads/HBASE-14850
Commit: f602e8a772a62063a2b18542c4ed9b57bf6d1cfe
Parents: d6c720b
Author: Xiaobing Zhou <xz...@hortonworks.com>
Authored: Thu Jul 27 17:33:08 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 31 19:22:24 2017 -0700
----------------------------------------------------------------------
.../connection/rpc-test-server-handler.cc | 7 +-
.../connection/rpc-test-server-handler.h | 2 +-
.../connection/rpc-test-server.cc | 31 ++++-
.../connection/rpc-test-server.h | 24 +++-
hbase-native-client/connection/rpc-test.cc | 129 ++++++++++++++-----
hbase-native-client/serde/rpc-serde.cc | 27 ++++
hbase-native-client/serde/rpc-serde.h | 14 ++
7 files changed, 194 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
index 7d2f407..7f41b7e 100644
--- a/hbase-native-client/connection/rpc-test-server-handler.cc
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -46,16 +46,17 @@ void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IO
}
folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
- std::unique_ptr<Response> r) {
+ std::unique_ptr<Response> resp) {
VLOG(3) << "Writing RPC Request";
// Send the data down the pipeline.
- return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get()));
+ return ctx->fireWrite(
+ serde_.Response(resp->call_id(), resp->resp_msg().get(), resp->exception()));
}
std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
const std::string& method_name) {
std::unique_ptr<Request> result = nullptr;
- ;
+
if (method_name == "ping") {
result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
std::make_shared<EmptyResponseProto>(), method_name);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
index 4c84615..ab0264f 100644
--- a/hbase-native-client/connection/rpc-test-server-handler.h
+++ b/hbase-native-client/connection/rpc-test-server-handler.h
@@ -36,7 +36,7 @@ class RpcTestServerSerializeHandler
void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
- folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r) override;
+ folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> resp) override;
private:
std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
index d3a30b1..b9e1f13 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -30,19 +30,35 @@ namespace hbase {
RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
std::shared_ptr<AsyncTransportWrapper> sock) {
+ if (service_ == nullptr) {
+ initService(sock);
+ }
+ CHECK(service_ != nullptr);
+
auto pipeline = RpcTestServerSerializePipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
// ensure we can write from any thread
pipeline->addBack(EventBaseHandler());
pipeline->addBack(LengthFieldBasedFrameDecoder());
pipeline->addBack(RpcTestServerSerializeHandler());
- pipeline->addBack(
- MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_));
+ pipeline->addBack(MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(
+ service_.get()));
pipeline->finalize();
return pipeline;
}
+void RpcTestServerPipelineFactory::initService(std::shared_ptr<AsyncTransportWrapper> sock) {
+ /* get server address */
+ SocketAddress localAddress;
+ sock->getLocalAddress(&localAddress);
+
+ /* init service with server address */
+ service_ = std::make_shared<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>>(
+ std::make_shared<CPUThreadPoolExecutor>(1),
+ std::make_shared<RpcTestService>(std::make_shared<SocketAddress>(localAddress)));
+}
+
Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) {
/* build Response */
auto response = std::make_unique<Response>();
@@ -54,11 +70,20 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req
response->set_resp_msg(pb_resp_msg);
} else if (method_name == "echo") {
auto pb_resp_msg = std::make_shared<EchoResponseProto>();
+ /* get msg from client */
auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
pb_resp_msg->set_message(pb_req_msg->message());
response->set_resp_msg(pb_resp_msg);
+ VLOG(1) << "RPC server:"
+ << " echo called, " << pb_req_msg->message();
+
} else if (method_name == "error") {
- // TODO:
+ auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+ response->set_resp_msg(pb_resp_msg);
+ VLOG(1) << "RPC server:"
+ << " error called.";
+ response->set_exception(folly::make_exception_wrapper<RpcTestException>("server error!"));
+
} else if (method_name == "pause") {
// TODO:
} else if (method_name == "addr") {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
index c3225ff..955560e 100644
--- a/hbase-native-client/connection/rpc-test-server.h
+++ b/hbase-native-client/connection/rpc-test-server.h
@@ -17,12 +17,14 @@
*
*/
#pragma once
+#include <folly/SocketAddress.h>
#include <wangle/concurrent/CPUThreadPoolExecutor.h>
#include <wangle/service/ExecutorFilter.h>
#include <wangle/service/Service.h>
#include "connection/request.h"
#include "connection/response.h"
+#include "exceptions/exception.h"
using namespace hbase;
using namespace folly;
@@ -31,11 +33,24 @@ using namespace wangle;
namespace hbase {
using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
+class RpcTestException : public IOException {
+ public:
+ RpcTestException() {}
+ RpcTestException(const std::string& what) : IOException(what) {}
+ RpcTestException(const std::string& what, const folly::exception_wrapper& cause)
+ : IOException(what, cause) {}
+ RpcTestException(const folly::exception_wrapper& cause) : IOException("", cause) {}
+};
+
class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> {
public:
- RpcTestService() {}
+ RpcTestService(std::shared_ptr<folly::SocketAddress> socket_address)
+ : socket_address_(socket_address) {}
virtual ~RpcTestService() = default;
Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override;
+
+ private:
+ std::shared_ptr<folly::SocketAddress> socket_address_;
};
class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> {
@@ -44,7 +59,10 @@ class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSeriali
std::shared_ptr<AsyncTransportWrapper> sock) override;
private:
- ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{
- std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()};
+ void initService(std::shared_ptr<AsyncTransportWrapper> sock);
+
+ private:
+ std::shared_ptr<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>> service_{
+ nullptr};
};
} // end of namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
index d4cd89f..2949fe9 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -30,6 +30,7 @@
#include <boost/thread.hpp>
#include "connection/rpc-client.h"
+#include "exceptions/exception.h"
#include "if/test.pb.h"
#include "rpc-test-server.h"
#include "security/user.h"
@@ -40,46 +41,114 @@ using namespace folly;
using namespace hbase;
DEFINE_int32(port, 0, "test server port");
+typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
+typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
-TEST(RpcTestServer, echo) {
- /* create conf */
+class RpcTest : public ::testing::Test {
+ public:
+ static void SetUpTestCase() { google::InstallFailureSignalHandler(); }
+};
+
+std::shared_ptr<Configuration> CreateConf() {
auto conf = std::make_shared<Configuration>();
conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
+ return conf;
+}
+ServerPtr CreateRpcServer() {
/* create rpc test server */
- auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>();
+ auto server = std::make_shared<ServerTestBootstrap>();
server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
server->bind(FLAGS_port);
- folly::SocketAddress server_addr;
- server->getSockets()[0]->getAddress(&server_addr);
+ return server;
+}
- /* create RpcClient */
+std::shared_ptr<folly::SocketAddress> GetRpcServerAddress(ServerPtr server) {
+ auto addr = std::make_shared<folly::SocketAddress>();
+ server->getSockets()[0]->getAddress(addr.get());
+ return addr;
+}
+
+std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf) {
auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ auto client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
+ return client;
+}
+
+std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf,
+ std::chrono::nanoseconds connect_timeout) {
+ auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+ auto client = std::make_shared<RpcClient>(io_executor, nullptr, conf, connect_timeout);
+ return client;
+}
+
+/**
+ * test echo
+ */
+TEST_F(RpcTest, Echo) {
+ auto conf = CreateConf();
+ auto server = CreateRpcServer();
+ auto server_addr = GetRpcServerAddress(server);
+ auto client = CreateRpcClient(conf);
+
+ std::string greetings = "hello, hbase server!";
+ auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+ std::make_shared<EchoResponseProto>(), "echo");
+ auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+ pb_msg->set_message(greetings);
+
+ /* sending out request */
+ client
+ ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+ hbase::security::User::defaultUser())
+ .then([=](std::unique_ptr<Response> response) {
+ auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
+ EXPECT_TRUE(pb_resp != nullptr);
+ VLOG(1) << "RPC echo returned: " + pb_resp->message();
+ EXPECT_EQ(greetings, pb_resp->message());
+ })
+ .onError([](const folly::exception_wrapper& ew) {
+ FAIL() << "Shouldn't get here, no exception is expected for RPC echo.";
+ });
+
+ server->stop();
+ server->join();
+}
+
+/**
+ * test error
+ */
+TEST_F(RpcTest, error) {
+ auto conf = CreateConf();
+ auto server = CreateRpcServer();
+ auto server_addr = GetRpcServerAddress(server);
+ auto client = CreateRpcClient(conf);
+
+ auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), "error");
+ /* sending out request */
+ client
+ ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+ hbase::security::User::defaultUser())
+ .then([=](std::unique_ptr<Response> response) {
+ FAIL() << "Shouldn't get here, exception is expected for RPC error.";
+ })
+ .onError([](const folly::exception_wrapper& ew) {
+ VLOG(1) << "RPC error returned with exception.";
+ std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString();
+ std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString();
+
+ /* verify exception_wrapper */
+ EXPECT_TRUE(bool(ew));
+ EXPECT_EQ(kRemoteException, ew.class_name());
- auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
-
- /**
- * test echo
- */
- try {
- std::string greetings = "hello, hbase server!";
- auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
- std::make_shared<EchoResponseProto>(), "echo");
- auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
- pb_msg->set_message(greetings);
-
- /* sending out request */
- rpc_client
- ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request),
- hbase::security::User::defaultUser())
- .then([=](std::unique_ptr<Response> response) {
- auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
- VLOG(1) << "message returned: " + pb_resp->message();
- EXPECT_EQ(greetings, pb_resp->message());
- });
- } catch (const std::exception& e) {
- throw e;
- }
+ /* verify RemoteException */
+ EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& re) {
+ /* verify DoNotRetryIOException*/
+ EXPECT_EQ(kRpcTestException, re.exception_class_name());
+ EXPECT_EQ(kRpcTestException + ": server error!", re.stack_trace());
+ }));
+ });
server->stop();
server->join();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/serde/rpc-serde.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc
index 9e1f79a..70a57e8 100644
--- a/hbase-native-client/serde/rpc-serde.cc
+++ b/hbase-native-client/serde/rpc-serde.cc
@@ -40,6 +40,8 @@ using google::protobuf::io::CodedInputStream;
using google::protobuf::io::CodedOutputStream;
using google::protobuf::io::ZeroCopyOutputStream;
+using namespace hbase::pb;
+
namespace hbase {
static const std::string PREAMBLE = "HBas";
@@ -174,6 +176,31 @@ std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
return PrependLength(std::move(ser_header));
}
+std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
+ const google::protobuf::Message *msg,
+ const folly::exception_wrapper &exception) {
+ /* create ResponseHeader */
+ pb::ResponseHeader rh;
+ rh.set_call_id(call_id);
+
+ /* create ExceptionResponse */
+ if (bool(exception)) {
+ VLOG(1) << "packing ExceptionResponse";
+ auto exception_response = new pb::ExceptionResponse();
+ exception_response->set_exception_class_name(exception.class_name().c_str());
+ exception_response->set_stack_trace(exception.what().c_str());
+ rh.set_allocated_exception(exception_response);
+ }
+
+ /* serialize Response header and body */
+ auto ser_header = SerializeDelimited(rh);
+ auto ser_resp = SerializeDelimited(*msg);
+ ser_header->appendChain(std::move(ser_resp));
+
+ VLOG(3) << "Converted hbase::Response to folly::IOBuf";
+ return PrependLength(std::move(ser_header));
+}
+
std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
uint32_t offset, uint32_t length) {
if (codec_ == nullptr) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/serde/rpc-serde.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h
index 0e1d44e..6941f62 100644
--- a/hbase-native-client/serde/rpc-serde.h
+++ b/hbase-native-client/serde/rpc-serde.h
@@ -21,10 +21,12 @@
#include <memory>
#include <string>
+#include <folly/ExceptionWrapper.h>
#include "if/HBase.pb.h"
#include "serde/cell-scanner.h"
#include "serde/codec.h"
+using namespace folly;
// Forward
namespace folly {
class IOBuf;
@@ -110,6 +112,18 @@ class RpcSerde {
const google::protobuf::Message *msg);
/**
+ * Serialize a response message into a protobuf.
+ * Request consists of:
+ *
+ * - Big endian length
+ * - ResponseHeader object
+ * - The passed in hbase::Response object
+ */
+ std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
+ const google::protobuf::Message *msg,
+ const folly::exception_wrapper &exception);
+
+ /**
* Serialize a message in the delimited format.
* Delimited format consists of the following:
*