You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/08/08 02:29:51 UTC
hbase git commit: HBASE-18078. [C++] Harden RPC by handling various
communication abnormalities
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 65aa54885 -> 5261c67b5
HBASE-18078. [C++] Harden RPC by handling various communication abnormalities
Signed-off-by: Enis Soztutar <en...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5261c67b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5261c67b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5261c67b
Branch: refs/heads/HBASE-14850
Commit: 5261c67b5950cec81ff0d29bc58e3a7db3985068
Parents: 65aa548
Author: Xiaobing Zhou <xz...@hortonworks.com>
Authored: Mon Aug 7 17:44:59 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Aug 7 19:28:17 2017 -0700
----------------------------------------------------------------------
.../connection/client-dispatcher.cc | 16 +++-
.../connection/client-handler.cc | 14 +++-
.../connection/connection-factory.cc | 29 ++++---
hbase-native-client/connection/rpc-client.cc | 49 +++++++++++-
hbase-native-client/connection/rpc-client.h | 8 ++
.../connection/rpc-test-server-handler.cc | 3 +
.../connection/rpc-test-server.cc | 3 +
hbase-native-client/connection/rpc-test.cc | 84 ++++++++++++++++----
hbase-native-client/exceptions/exception.h | 16 +++-
hbase-native-client/if/test_rpc_service.proto | 1 +
10 files changed, 186 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc
index b9b2c34..d5d7f5f 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -18,8 +18,10 @@
*/
#include "connection/client-dispatcher.h"
#include <folly/ExceptionWrapper.h>
-
+#include <folly/Format.h>
+#include <folly/io/async/AsyncSocketException.h>
#include <utility>
+#include "exceptions/exception.h"
using std::unique_ptr;
@@ -31,6 +33,9 @@ void ClientDispatcher::read(Context *ctx, unique_ptr<Response> in) {
auto call_id = in->call_id();
auto p = requests_.find_and_erase(call_id);
+ VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what: {}",
+ in->call_id(), bool(in->exception()), in->exception().what());
+
if (in->exception()) {
p.setException(in->exception());
} else {
@@ -51,7 +56,14 @@ folly::Future<unique_ptr<Response>> ClientDispatcher::operator()(unique_ptr<Requ
LOG(ERROR) << "e = " << call_id;
this->requests_.erase(call_id);
});
- this->pipeline_->write(std::move(arg));
+
+ try {
+ this->pipeline_->write(std::move(arg));
+ } catch (const folly::AsyncSocketException &e) {
+ p.setException(folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}});
+ /* clear folly::Promise to avoid overflow. */
+ requests_.erase(call_id);
+ }
return f;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 39227d3..983a68c 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -21,8 +21,8 @@
#include <folly/ExceptionWrapper.h>
#include <folly/Likely.h>
+#include <folly/io/async/AsyncSocketException.h>
#include <glog/logging.h>
-
#include <string>
#include "connection/request.h"
@@ -95,7 +95,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
: "";
std::string stack_trace =
exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
- what.append(exception_class_name).append(stack_trace);
+ what.append(stack_trace);
auto remote_exception = std::make_unique<RemoteException>(what);
remote_exception->set_exception_class_name(exception_class_name)
@@ -133,7 +133,13 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re
// Now store the call id to response.
resp_msgs_->insert(std::make_pair(r->call_id(), r->resp_msg()));
- // Send the data down the pipeline.
- return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
+ try {
+ // Send the data down the pipeline.
+ return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
+ } catch (const folly::AsyncSocketException &e) {
+ /* clear protobuf::Message to avoid overflow. */
+ resp_msgs_->erase(r->call_id());
+ throw e;
+ }
}
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index a0c7f96..e763c03 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -22,11 +22,16 @@
#include <chrono>
+#include <folly/ExceptionWrapper.h>
+#include <folly/SocketAddress.h>
+#include <folly/io/async/AsyncSocketException.h>
+
#include "connection/client-dispatcher.h"
#include "connection/connection-factory.h"
#include "connection/pipeline.h"
#include "connection/sasl-handler.h"
#include "connection/service.h"
+#include "exceptions/exception.h"
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
@@ -56,15 +61,19 @@ std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::M
std::shared_ptr<HBaseService> ConnectionFactory::Connect(
std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, const std::string &hostname,
uint16_t port) {
- // Yes this will block however it makes dealing with connection pool soooooo
- // much nicer.
- // TODO see about using shared promise for this.
- auto pipeline = client
- ->connect(folly::SocketAddress(hostname, port, true),
- std::chrono::duration_cast<milliseconds>(connect_timeout_))
- .get();
- auto dispatcher = std::make_shared<ClientDispatcher>();
- dispatcher->setPipeline(pipeline);
- return dispatcher;
+ try {
+ // Yes this will block however it makes dealing with connection pool soooooo
+ // much nicer.
+ // TODO see about using shared promise for this.
+ auto pipeline = client
+ ->connect(folly::SocketAddress(hostname, port, true),
+ std::chrono::duration_cast<milliseconds>(connect_timeout_))
+ .get();
+ auto dispatcher = std::make_shared<ClientDispatcher>();
+ dispatcher->setPipeline(pipeline);
+ return dispatcher;
+ } catch (const folly::AsyncSocketException &e) {
+ throw ConnectionException(folly::exception_wrapper{e});
+ }
}
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/rpc-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 10faa7a..a16dca6 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -19,9 +19,12 @@
#include "connection/rpc-client.h"
+#include <folly/Format.h>
#include <folly/Logging.h>
+#include <folly/futures/Future.h>
#include <unistd.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include "exceptions/exception.h"
using hbase::security::User;
using std::chrono::nanoseconds;
@@ -55,7 +58,7 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string&
std::unique_ptr<Request> req,
std::shared_ptr<User> ticket) {
auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
- return GetConnection(remote_id)->SendRequest(std::move(req));
+ return SendRequest(remote_id, std::move(req));
}
folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
@@ -64,7 +67,49 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string&
std::shared_ptr<User> ticket,
const std::string& service_name) {
auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name);
- return GetConnection(remote_id)->SendRequest(std::move(req));
+ return SendRequest(remote_id, std::move(req));
+}
+
+/**
+ * There are two cases for ConnectionException:
+ * 1. The first time connection
+ * establishment, i.e. GetConnection(remote_id), AsyncSocketException being a cause.
+ * 2. Writing request down the pipeline, i.e. RpcConnection::SendRequest, AsyncSocketException being
+ * a cause as well.
+ */
+folly::Future<std::unique_ptr<Response>> RpcClient::SendRequest(
+ std::shared_ptr<ConnectionId> remote_id, std::unique_ptr<Request> req) {
+ try {
+ return GetConnection(remote_id)
+ ->SendRequest(std::move(req))
+ .onError([&, this](const folly::exception_wrapper& ew) {
+ VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what());
+ ew.with_exception([&, this](const hbase::ConnectionException& re) {
+ /* bad connection, remove it from pool. */
+ cp_->Close(remote_id);
+ });
+ return GetFutureWithException(ew);
+ });
+ } catch (const ConnectionException& e) {
+ CHECK(e.cause().get_exception() != nullptr);
+ VLOG(3) << folly::sformat("RpcClient Exception: {}", e.cause().what());
+ /* bad connection, remove it from pool. */
+ cp_->Close(remote_id);
+ return GetFutureWithException(e);
+ }
+}
+
+template <typename EXCEPTION>
+folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(const EXCEPTION& e) {
+ return GetFutureWithException(folly::exception_wrapper{e});
+}
+
+folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(
+ const folly::exception_wrapper& ew) {
+ folly::Promise<std::unique_ptr<Response>> promise;
+ auto future = promise.getFuture();
+ promise.setException(ew);
+ return future;
}
std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/rpc-client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 0ecde5b..8145be4 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -20,6 +20,7 @@
#include <google/protobuf/service.h>
+#include <folly/ExceptionWrapper.h>
#include <chrono>
#include <memory>
#include <string>
@@ -65,6 +66,13 @@ class RpcClient {
private:
std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id);
+ folly::Future<std::unique_ptr<Response>> SendRequest(std::shared_ptr<ConnectionId> remote_id,
+ std::unique_ptr<Request> req);
+ template <typename EXCEPTION>
+ folly::Future<std::unique_ptr<Response>> GetFutureWithException(const EXCEPTION &e);
+
+ folly::Future<std::unique_ptr<Response>> GetFutureWithException(
+ const folly::exception_wrapper &ew);
private:
std::shared_ptr<ConnectionPool> cp_;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
index 7f41b7e..8e405ef 100644
--- a/hbase-native-client/connection/rpc-test-server-handler.cc
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -72,6 +72,9 @@ std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
} else if (method_name == "addr") {
result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
std::make_shared<AddrResponseProto>(), method_name);
+ } else if (method_name == "socketNotOpen") {
+ result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method_name);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
index b9e1f13..f350d6a 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -88,6 +88,9 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req
// TODO:
} else if (method_name == "addr") {
// TODO:
+ } else if (method_name == "socketNotOpen") {
+ auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+ response->set_resp_msg(pb_resp_msg);
}
return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
index 2949fe9..e7f678d 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -20,10 +20,12 @@
#include <wangle/bootstrap/ClientBootstrap.h>
#include <wangle/channel/Handler.h>
+#include <folly/Format.h>
#include <folly/Logging.h>
#include <folly/SocketAddress.h>
#include <folly/String.h>
#include <folly/experimental/TestUtil.h>
+#include <folly/io/async/AsyncSocketException.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -41,6 +43,9 @@ using namespace folly;
using namespace hbase;
DEFINE_int32(port, 0, "test server port");
+DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result");
+DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.",
+ "output format of enforcing fail");
typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
@@ -91,9 +96,10 @@ TEST_F(RpcTest, Echo) {
auto server_addr = GetRpcServerAddress(server);
auto client = CreateRpcClient(conf);
- std::string greetings = "hello, hbase server!";
+ auto method = "echo";
+ auto greetings = "hello, hbase server!";
auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
- std::make_shared<EchoResponseProto>(), "echo");
+ std::make_shared<EchoResponseProto>(), method);
auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
pb_msg->set_message(greetings);
@@ -101,14 +107,14 @@ TEST_F(RpcTest, Echo) {
client
->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
hbase::security::User::defaultUser())
- .then([=](std::unique_ptr<Response> response) {
+ .then([&](std::unique_ptr<Response> response) {
auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
EXPECT_TRUE(pb_resp != nullptr);
- VLOG(1) << "RPC echo returned: " + pb_resp->message();
+ VLOG(1) << folly::sformat(FLAGS_result_format, method, pb_resp->message());
EXPECT_EQ(greetings, pb_resp->message());
})
- .onError([](const folly::exception_wrapper& ew) {
- FAIL() << "Shouldn't get here, no exception is expected for RPC echo.";
+ .onError([&](const folly::exception_wrapper& ew) {
+ FAIL() << folly::sformat(FLAGS_fail_format, method);
});
server->stop();
@@ -118,23 +124,24 @@ TEST_F(RpcTest, Echo) {
/**
* test error
*/
-TEST_F(RpcTest, error) {
+TEST_F(RpcTest, Error) {
auto conf = CreateConf();
auto server = CreateRpcServer();
auto server_addr = GetRpcServerAddress(server);
auto client = CreateRpcClient(conf);
+ auto method = "error";
auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
- std::make_shared<EmptyResponseProto>(), "error");
+ std::make_shared<EmptyResponseProto>(), method);
/* sending out request */
client
->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
hbase::security::User::defaultUser())
- .then([=](std::unique_ptr<Response> response) {
- FAIL() << "Shouldn't get here, exception is expected for RPC error.";
+ .then([&](std::unique_ptr<Response> response) {
+ FAIL() << folly::sformat(FLAGS_fail_format, method);
})
- .onError([](const folly::exception_wrapper& ew) {
- VLOG(1) << "RPC error returned with exception.";
+ .onError([&](const folly::exception_wrapper& ew) {
+ VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString();
std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString();
@@ -142,14 +149,57 @@ TEST_F(RpcTest, error) {
EXPECT_TRUE(bool(ew));
EXPECT_EQ(kRemoteException, ew.class_name());
- /* verify RemoteException */
- EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& re) {
- /* verify DoNotRetryIOException*/
- EXPECT_EQ(kRpcTestException, re.exception_class_name());
- EXPECT_EQ(kRpcTestException + ": server error!", re.stack_trace());
+ /* verify exception */
+ EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& e) {
+ EXPECT_EQ(kRpcTestException, e.exception_class_name());
+ EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace());
}));
});
server->stop();
server->join();
}
+
+TEST_F(RpcTest, SocketNotOpen) {
+ auto conf = CreateConf();
+ auto server = CreateRpcServer();
+ auto server_addr = GetRpcServerAddress(server);
+ auto client = CreateRpcClient(conf);
+
+ auto method = "socketNotOpen";
+ auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method);
+
+ server->stop();
+ server->join();
+
+ /* sending out request */
+ client
+ ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+ hbase::security::User::defaultUser())
+ .then([&](std::unique_ptr<Response> response) {
+ FAIL() << folly::sformat(FLAGS_fail_format, method);
+ })
+ .onError([&](const folly::exception_wrapper& ew) {
+ VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
+ std::string kConnectionException =
+ demangle(typeid(hbase::ConnectionException)).toStdString();
+ std::string kAsyncSocketException =
+ demangle(typeid(folly::AsyncSocketException)).toStdString();
+
+ /* verify exception_wrapper */
+ EXPECT_TRUE(bool(ew));
+ EXPECT_EQ(kConnectionException, ew.class_name());
+
+ /* verify exception */
+ EXPECT_TRUE(ew.with_exception([&](const hbase::ConnectionException& e) {
+ EXPECT_TRUE(bool(e.cause()));
+ EXPECT_EQ(kAsyncSocketException, e.cause().class_name());
+ VLOG(1) << folly::sformat(FLAGS_result_format, method, e.cause().what());
+ e.cause().with_exception([&](const folly::AsyncSocketException& ase) {
+ EXPECT_EQ(AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, ase.getType());
+ EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno());
+ });
+ }));
+ });
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/exceptions/exception.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
index bdedff4..bc3b291 100644
--- a/hbase-native-client/exceptions/exception.h
+++ b/hbase-native-client/exceptions/exception.h
@@ -59,7 +59,7 @@ class IOException : public std::logic_error {
IOException(const std::string& what, bool do_not_retry)
: logic_error(what), do_not_retry_(do_not_retry) {}
- IOException(const std::string& what, folly::exception_wrapper cause)
+ IOException(const std::string& what, const folly::exception_wrapper& cause)
: logic_error(what), cause_(cause), do_not_retry_(false) {}
IOException(const std::string& what, folly::exception_wrapper cause, bool do_not_retry)
@@ -67,7 +67,7 @@ class IOException : public std::logic_error {
virtual ~IOException() = default;
- virtual folly::exception_wrapper cause() { return cause_; }
+ virtual folly::exception_wrapper cause() const { return cause_; }
bool do_not_retry() const { return do_not_retry_; }
@@ -115,6 +115,18 @@ class RetriesExhaustedException : public IOException {
int32_t num_retries_;
};
+class ConnectionException : public IOException {
+ public:
+ ConnectionException() {}
+
+ ConnectionException(const std::string& what) : IOException(what) {}
+
+ ConnectionException(const folly::exception_wrapper& cause) : IOException("", cause) {}
+
+ ConnectionException(const std::string& what, const folly::exception_wrapper& cause)
+ : IOException(what, cause) {}
+};
+
class RemoteException : public IOException {
public:
RemoteException() : IOException(), port_(0) {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5261c67b/hbase-native-client/if/test_rpc_service.proto
----------------------------------------------------------------------
diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto
index 5f91dc4..2730403 100644
--- a/hbase-native-client/if/test_rpc_service.proto
+++ b/hbase-native-client/if/test_rpc_service.proto
@@ -32,4 +32,5 @@ service TestProtobufRpcProto {
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
rpc pause(PauseRequestProto) returns (EmptyResponseProto);
rpc addr(EmptyRequestProto) returns (AddrResponseProto);
+ rpc socketNotOpen(EmptyRequestProto) returns (EmptyResponseProto);
}