You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:47 UTC

[hbase] 119/133: HBASE-18578 [C++] Add pause for RPC test

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 515a1651a5b53a73b45e4b7bdb21a8ffa606b9f9
Author: Xiaobing Zhou <xz...@hortonworks.com>
AuthorDate: Tue Aug 22 12:01:21 2017 -0700

    HBASE-18578 [C++] Add pause for RPC test
    
    Signed-off-by: Enis Soztutar <en...@apache.org>
---
 hbase-native-client/connection/rpc-test-server.cc |  9 +++-
 hbase-native-client/connection/rpc-test.cc        | 61 +++++++++++++++++++----
 2 files changed, 59 insertions(+), 11 deletions(-)

diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
index 6132fbb..707bca7 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -88,7 +88,14 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req
     response->set_exception(folly::make_exception_wrapper<RpcTestException>("server error!"));
 
   } else if (method_name == "pause") {
-    // TODO:
+    auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+    /* sleeping */
+    auto pb_req_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg());
+    std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms()));
+    response->set_resp_msg(pb_resp_msg);
+    VLOG(1) << "RPC server:"
+            << " pause called, " << pb_req_msg->ms() << " ms";
+
   } else if (method_name == "addr") {
     // TODO:
   } else if (method_name == "socketNotOpen") {
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
index 4688950..d541397 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -30,6 +30,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 #include <boost/thread.hpp>
+#include <chrono>
 
 #include "connection/rpc-client.h"
 #include "exceptions/exception.h"
@@ -41,11 +42,14 @@
 using namespace wangle;
 using namespace folly;
 using namespace hbase;
+using namespace std::chrono;
 
 DEFINE_int32(port, 0, "test server port");
 DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result");
-DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.",
-              "output format of enforcing fail");
+DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.",
+              "output format of enforcing fail with exception");
+DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.",
+              "output format of enforcing fail without exception");
 typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
 typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
 
@@ -110,8 +114,8 @@ TEST_F(RpcTest, Ping) {
         VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
       })
       .onError([&](const folly::exception_wrapper& ew) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
-      });
+        FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+      }).get();
 
   server->stop();
   server->join();
@@ -144,8 +148,8 @@ TEST_F(RpcTest, Echo) {
         EXPECT_EQ(greetings, pb_resp->message());
       })
       .onError([&](const folly::exception_wrapper& ew) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
-      });
+        FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+      }).get();
 
   server->stop();
   server->join();
@@ -168,7 +172,7 @@ TEST_F(RpcTest, Error) {
       ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
                   hbase::security::User::defaultUser())
       .then([&](std::unique_ptr<Response> response) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
+        FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
       })
       .onError([&](const folly::exception_wrapper& ew) {
         VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
@@ -184,7 +188,7 @@ TEST_F(RpcTest, Error) {
           EXPECT_EQ(kRpcTestException, e.exception_class_name());
           EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace());
         }));
-      });
+      }).get();
 
   server->stop();
   server->join();
@@ -208,7 +212,7 @@ TEST_F(RpcTest, SocketNotOpen) {
       ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
                   hbase::security::User::defaultUser())
       .then([&](std::unique_ptr<Response> response) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
+        FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
       })
       .onError([&](const folly::exception_wrapper& ew) {
         VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
@@ -231,5 +235,42 @@ TEST_F(RpcTest, SocketNotOpen) {
             EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno());
           });
         }));
-      });
+      }).get();
+}
+
+/**
+ * test pause
+ */
+TEST_F(RpcTest, Pause) {
+  int ms = 500;
+
+  auto conf = CreateConf();
+  auto server = CreateRpcServer();
+  auto server_addr = GetRpcServerAddress(server);
+  auto client =
+      CreateRpcClient(conf, std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms)));
+
+  auto method = "pause";
+  auto request = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+                                           std::make_shared<EmptyResponseProto>(), method);
+  auto pb_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg());
+
+  pb_msg->set_ms(ms);
+
+  /* sending out request */
+  client
+      ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+                  hbase::security::User::defaultUser())
+      .then([&](std::unique_ptr<Response> response) {
+        auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg());
+        EXPECT_TRUE(pb_resp != nullptr);
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
+      })
+      .onError([&](const folly::exception_wrapper& ew) {
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
+        FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+      }).get();
+
+  server->stop();
+  server->join();
 }