You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:47 UTC
[hbase] 119/133: HBASE-18578 [C++] Add pause for RPC test
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 515a1651a5b53a73b45e4b7bdb21a8ffa606b9f9
Author: Xiaobing Zhou <xz...@hortonworks.com>
AuthorDate: Tue Aug 22 12:01:21 2017 -0700
HBASE-18578 [C++] Add pause for RPC test
Signed-off-by: Enis Soztutar <en...@apache.org>
---
hbase-native-client/connection/rpc-test-server.cc | 9 +++-
hbase-native-client/connection/rpc-test.cc | 61 +++++++++++++++++++----
2 files changed, 59 insertions(+), 11 deletions(-)
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
index 6132fbb..707bca7 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -88,7 +88,14 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req
response->set_exception(folly::make_exception_wrapper<RpcTestException>("server error!"));
} else if (method_name == "pause") {
- // TODO:
+ auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+ /* sleeping */
+ auto pb_req_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg());
+ std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms()));
+ response->set_resp_msg(pb_resp_msg);
+ VLOG(1) << "RPC server:"
+ << " pause called, " << pb_req_msg->ms() << " ms";
+
} else if (method_name == "addr") {
// TODO:
} else if (method_name == "socketNotOpen") {
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
index 4688950..d541397 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -30,6 +30,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <boost/thread.hpp>
+#include <chrono>
#include "connection/rpc-client.h"
#include "exceptions/exception.h"
@@ -41,11 +42,14 @@
using namespace wangle;
using namespace folly;
using namespace hbase;
+using namespace std::chrono;
DEFINE_int32(port, 0, "test server port");
DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result");
-DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.",
- "output format of enforcing fail");
+DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.",
+ "output format of enforcing fail with exception");
+DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.",
+ "output format of enforcing fail without exception");
typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
@@ -110,8 +114,8 @@ TEST_F(RpcTest, Ping) {
VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
})
.onError([&](const folly::exception_wrapper& ew) {
- FAIL() << folly::sformat(FLAGS_fail_format, method);
- });
+ FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+ }).get();
server->stop();
server->join();
@@ -144,8 +148,8 @@ TEST_F(RpcTest, Echo) {
EXPECT_EQ(greetings, pb_resp->message());
})
.onError([&](const folly::exception_wrapper& ew) {
- FAIL() << folly::sformat(FLAGS_fail_format, method);
- });
+ FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+ }).get();
server->stop();
server->join();
@@ -168,7 +172,7 @@ TEST_F(RpcTest, Error) {
->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
hbase::security::User::defaultUser())
.then([&](std::unique_ptr<Response> response) {
- FAIL() << folly::sformat(FLAGS_fail_format, method);
+ FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
})
.onError([&](const folly::exception_wrapper& ew) {
VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
@@ -184,7 +188,7 @@ TEST_F(RpcTest, Error) {
EXPECT_EQ(kRpcTestException, e.exception_class_name());
EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace());
}));
- });
+ }).get();
server->stop();
server->join();
@@ -208,7 +212,7 @@ TEST_F(RpcTest, SocketNotOpen) {
->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
hbase::security::User::defaultUser())
.then([&](std::unique_ptr<Response> response) {
- FAIL() << folly::sformat(FLAGS_fail_format, method);
+ FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
})
.onError([&](const folly::exception_wrapper& ew) {
VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
@@ -231,5 +235,42 @@ TEST_F(RpcTest, SocketNotOpen) {
EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno());
});
}));
- });
+ }).get();
+}
+
+/**
+ * test pause
+ */
+TEST_F(RpcTest, Pause) {
+ int ms = 500;
+
+ auto conf = CreateConf();
+ auto server = CreateRpcServer();
+ auto server_addr = GetRpcServerAddress(server);
+ auto client =
+ CreateRpcClient(conf, std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms)));
+
+ auto method = "pause";
+ auto request = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+ std::make_shared<EmptyResponseProto>(), method);
+ auto pb_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg());
+
+ pb_msg->set_ms(ms);
+
+ /* sending out request */
+ client
+ ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+ hbase::security::User::defaultUser())
+ .then([&](std::unique_ptr<Response> response) {
+ auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg());
+ EXPECT_TRUE(pb_resp != nullptr);
+ VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
+ })
+ .onError([&](const folly::exception_wrapper& ew) {
+ VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
+ FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+ }).get();
+
+ server->stop();
+ server->join();
}