You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/08/01 19:04:18 UTC

hbase git commit: HBASE-18466. [C++] Support handling exception in RpcTestServer

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 d6c720b8f -> f602e8a77


HBASE-18466. [C++] Support handling exception in RpcTestServer

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f602e8a7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f602e8a7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f602e8a7

Branch: refs/heads/HBASE-14850
Commit: f602e8a772a62063a2b18542c4ed9b57bf6d1cfe
Parents: d6c720b
Author: Xiaobing Zhou <xz...@hortonworks.com>
Authored: Thu Jul 27 17:33:08 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 31 19:22:24 2017 -0700

----------------------------------------------------------------------
 .../connection/rpc-test-server-handler.cc       |   7 +-
 .../connection/rpc-test-server-handler.h        |   2 +-
 .../connection/rpc-test-server.cc               |  31 ++++-
 .../connection/rpc-test-server.h                |  24 +++-
 hbase-native-client/connection/rpc-test.cc      | 129 ++++++++++++++-----
 hbase-native-client/serde/rpc-serde.cc          |  27 ++++
 hbase-native-client/serde/rpc-serde.h           |  14 ++
 7 files changed, 194 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
index 7d2f407..7f41b7e 100644
--- a/hbase-native-client/connection/rpc-test-server-handler.cc
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -46,16 +46,17 @@ void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IO
 }
 
 folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
-                                                                std::unique_ptr<Response> r) {
+                                                                std::unique_ptr<Response> resp) {
   VLOG(3) << "Writing RPC Request";
   // Send the data down the pipeline.
-  return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get()));
+  return ctx->fireWrite(
+      serde_.Response(resp->call_id(), resp->resp_msg().get(), resp->exception()));
 }
 
 std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
     const std::string& method_name) {
   std::unique_ptr<Request> result = nullptr;
-  ;
+
   if (method_name == "ping") {
     result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
                                        std::make_shared<EmptyResponseProto>(), method_name);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
index 4c84615..ab0264f 100644
--- a/hbase-native-client/connection/rpc-test-server-handler.h
+++ b/hbase-native-client/connection/rpc-test-server-handler.h
@@ -36,7 +36,7 @@ class RpcTestServerSerializeHandler
 
   void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
 
-  folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r) override;
+  folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> resp) override;
 
  private:
   std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
index d3a30b1..b9e1f13 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -30,19 +30,35 @@ namespace hbase {
 
 RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
     std::shared_ptr<AsyncTransportWrapper> sock) {
+  if (service_ == nullptr) {
+    initService(sock);
+  }
+  CHECK(service_ != nullptr);
+
   auto pipeline = RpcTestServerSerializePipeline::create();
   pipeline->addBack(AsyncSocketHandler(sock));
   // ensure we can write from any thread
   pipeline->addBack(EventBaseHandler());
   pipeline->addBack(LengthFieldBasedFrameDecoder());
   pipeline->addBack(RpcTestServerSerializeHandler());
-  pipeline->addBack(
-      MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_));
+  pipeline->addBack(MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(
+      service_.get()));
   pipeline->finalize();
 
   return pipeline;
 }
 
+void RpcTestServerPipelineFactory::initService(std::shared_ptr<AsyncTransportWrapper> sock) {
+  /* get server address */
+  SocketAddress localAddress;
+  sock->getLocalAddress(&localAddress);
+
+  /* init service with server address */
+  service_ = std::make_shared<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>>(
+      std::make_shared<CPUThreadPoolExecutor>(1),
+      std::make_shared<RpcTestService>(std::make_shared<SocketAddress>(localAddress)));
+}
+
 Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) {
   /* build Response */
   auto response = std::make_unique<Response>();
@@ -54,11 +70,20 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req
     response->set_resp_msg(pb_resp_msg);
   } else if (method_name == "echo") {
     auto pb_resp_msg = std::make_shared<EchoResponseProto>();
+    /* get msg from client */
     auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
     pb_resp_msg->set_message(pb_req_msg->message());
     response->set_resp_msg(pb_resp_msg);
+    VLOG(1) << "RPC server:"
+            << " echo called, " << pb_req_msg->message();
+
   } else if (method_name == "error") {
-    // TODO:
+    auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+    response->set_resp_msg(pb_resp_msg);
+    VLOG(1) << "RPC server:"
+            << " error called.";
+    response->set_exception(folly::make_exception_wrapper<RpcTestException>("server error!"));
+
   } else if (method_name == "pause") {
     // TODO:
   } else if (method_name == "addr") {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test-server.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
index c3225ff..955560e 100644
--- a/hbase-native-client/connection/rpc-test-server.h
+++ b/hbase-native-client/connection/rpc-test-server.h
@@ -17,12 +17,14 @@
  *
  */
 #pragma once
+#include <folly/SocketAddress.h>
 #include <wangle/concurrent/CPUThreadPoolExecutor.h>
 #include <wangle/service/ExecutorFilter.h>
 #include <wangle/service/Service.h>
 
 #include "connection/request.h"
 #include "connection/response.h"
+#include "exceptions/exception.h"
 
 using namespace hbase;
 using namespace folly;
@@ -31,11 +33,24 @@ using namespace wangle;
 namespace hbase {
 using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
 
+class RpcTestException : public IOException {
+ public:
+  RpcTestException() {}
+  RpcTestException(const std::string& what) : IOException(what) {}
+  RpcTestException(const std::string& what, const folly::exception_wrapper& cause)
+      : IOException(what, cause) {}
+  RpcTestException(const folly::exception_wrapper& cause) : IOException("", cause) {}
+};
+
 class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> {
  public:
-  RpcTestService() {}
+  RpcTestService(std::shared_ptr<folly::SocketAddress> socket_address)
+      : socket_address_(socket_address) {}
   virtual ~RpcTestService() = default;
   Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override;
+
+ private:
+  std::shared_ptr<folly::SocketAddress> socket_address_;
 };
 
 class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> {
@@ -44,7 +59,10 @@ class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSeriali
       std::shared_ptr<AsyncTransportWrapper> sock) override;
 
  private:
-  ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{
-      std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()};
+  void initService(std::shared_ptr<AsyncTransportWrapper> sock);
+
+ private:
+  std::shared_ptr<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>> service_{
+      nullptr};
 };
 }  // end of namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
index d4cd89f..2949fe9 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -30,6 +30,7 @@
 #include <boost/thread.hpp>
 
 #include "connection/rpc-client.h"
+#include "exceptions/exception.h"
 #include "if/test.pb.h"
 #include "rpc-test-server.h"
 #include "security/user.h"
@@ -40,46 +41,114 @@ using namespace folly;
 using namespace hbase;
 
 DEFINE_int32(port, 0, "test server port");
+typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
+typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
 
-TEST(RpcTestServer, echo) {
-  /* create conf */
+class RpcTest : public ::testing::Test {
+ public:
+  static void SetUpTestCase() { google::InstallFailureSignalHandler(); }
+};
+
+std::shared_ptr<Configuration> CreateConf() {
   auto conf = std::make_shared<Configuration>();
   conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
+  return conf;
+}
 
+ServerPtr CreateRpcServer() {
   /* create rpc test server */
-  auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>();
+  auto server = std::make_shared<ServerTestBootstrap>();
   server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
   server->bind(FLAGS_port);
-  folly::SocketAddress server_addr;
-  server->getSockets()[0]->getAddress(&server_addr);
+  return server;
+}
 
-  /* create RpcClient */
+std::shared_ptr<folly::SocketAddress> GetRpcServerAddress(ServerPtr server) {
+  auto addr = std::make_shared<folly::SocketAddress>();
+  server->getSockets()[0]->getAddress(addr.get());
+  return addr;
+}
+
+std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf) {
   auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  auto client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
+  return client;
+}
+
+std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf,
+                                           std::chrono::nanoseconds connect_timeout) {
+  auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+  auto client = std::make_shared<RpcClient>(io_executor, nullptr, conf, connect_timeout);
+  return client;
+}
+
+/**
+ * test echo
+ */
+TEST_F(RpcTest, Echo) {
+  auto conf = CreateConf();
+  auto server = CreateRpcServer();
+  auto server_addr = GetRpcServerAddress(server);
+  auto client = CreateRpcClient(conf);
+
+  std::string greetings = "hello, hbase server!";
+  auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+                                           std::make_shared<EchoResponseProto>(), "echo");
+  auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+  pb_msg->set_message(greetings);
+
+  /* sending out request */
+  client
+      ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+                  hbase::security::User::defaultUser())
+      .then([=](std::unique_ptr<Response> response) {
+        auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
+        EXPECT_TRUE(pb_resp != nullptr);
+        VLOG(1) << "RPC echo returned: " + pb_resp->message();
+        EXPECT_EQ(greetings, pb_resp->message());
+      })
+      .onError([](const folly::exception_wrapper& ew) {
+        FAIL() << "Shouldn't get here, no exception is expected for RPC echo.";
+      });
+
+  server->stop();
+  server->join();
+}
+
+/**
+ * test error
+ */
+TEST_F(RpcTest, error) {
+  auto conf = CreateConf();
+  auto server = CreateRpcServer();
+  auto server_addr = GetRpcServerAddress(server);
+  auto client = CreateRpcClient(conf);
+
+  auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                           std::make_shared<EmptyResponseProto>(), "error");
+  /* sending out request */
+  client
+      ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request),
+                  hbase::security::User::defaultUser())
+      .then([=](std::unique_ptr<Response> response) {
+        FAIL() << "Shouldn't get here, exception is expected for RPC error.";
+      })
+      .onError([](const folly::exception_wrapper& ew) {
+        VLOG(1) << "RPC error returned with exception.";
+        std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString();
+        std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString();
+
+        /* verify exception_wrapper */
+        EXPECT_TRUE(bool(ew));
+        EXPECT_EQ(kRemoteException, ew.class_name());
 
-  auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
-
-  /**
-   * test echo
-   */
-  try {
-    std::string greetings = "hello, hbase server!";
-    auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
-                                             std::make_shared<EchoResponseProto>(), "echo");
-    auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
-    pb_msg->set_message(greetings);
-
-    /* sending out request */
-    rpc_client
-        ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request),
-                    hbase::security::User::defaultUser())
-        .then([=](std::unique_ptr<Response> response) {
-          auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
-          VLOG(1) << "message returned: " + pb_resp->message();
-          EXPECT_EQ(greetings, pb_resp->message());
-        });
-  } catch (const std::exception& e) {
-    throw e;
-  }
+        /* verify RemoteException */
+        EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& re) {
+          /* verify  DoNotRetryIOException*/
+          EXPECT_EQ(kRpcTestException, re.exception_class_name());
+          EXPECT_EQ(kRpcTestException + ": server error!", re.stack_trace());
+        }));
+      });
 
   server->stop();
   server->join();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/serde/rpc-serde.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc
index 9e1f79a..70a57e8 100644
--- a/hbase-native-client/serde/rpc-serde.cc
+++ b/hbase-native-client/serde/rpc-serde.cc
@@ -40,6 +40,8 @@ using google::protobuf::io::CodedInputStream;
 using google::protobuf::io::CodedOutputStream;
 using google::protobuf::io::ZeroCopyOutputStream;
 
+using namespace hbase::pb;
+
 namespace hbase {
 
 static const std::string PREAMBLE = "HBas";
@@ -174,6 +176,31 @@ std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
   return PrependLength(std::move(ser_header));
 }
 
+std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
+                                                 const google::protobuf::Message *msg,
+                                                 const folly::exception_wrapper &exception) {
+  /* create ResponseHeader */
+  pb::ResponseHeader rh;
+  rh.set_call_id(call_id);
+
+  /* create ExceptionResponse */
+  if (bool(exception)) {
+    VLOG(1) << "packing ExceptionResponse";
+    auto exception_response = new pb::ExceptionResponse();
+    exception_response->set_exception_class_name(exception.class_name().c_str());
+    exception_response->set_stack_trace(exception.what().c_str());
+    rh.set_allocated_exception(exception_response);
+  }
+
+  /* serialize Response header and body */
+  auto ser_header = SerializeDelimited(rh);
+  auto ser_resp = SerializeDelimited(*msg);
+  ser_header->appendChain(std::move(ser_resp));
+
+  VLOG(3) << "Converted hbase::Response to folly::IOBuf";
+  return PrependLength(std::move(ser_header));
+}
+
 std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
                                                          uint32_t offset, uint32_t length) {
   if (codec_ == nullptr) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f602e8a7/hbase-native-client/serde/rpc-serde.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h
index 0e1d44e..6941f62 100644
--- a/hbase-native-client/serde/rpc-serde.h
+++ b/hbase-native-client/serde/rpc-serde.h
@@ -21,10 +21,12 @@
 #include <memory>
 #include <string>
 
+#include <folly/ExceptionWrapper.h>
 #include "if/HBase.pb.h"
 #include "serde/cell-scanner.h"
 #include "serde/codec.h"
 
+using namespace folly;
 // Forward
 namespace folly {
 class IOBuf;
@@ -110,6 +112,18 @@ class RpcSerde {
                                          const google::protobuf::Message *msg);
 
   /**
+   * Serialize a response message into a protobuf.
+   * Request consists of:
+   *
+   * - Big endian length
+   * - ResponseHeader object
+   * - The passed in hbase::Response object
+   */
+  std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
+                                         const google::protobuf::Message *msg,
+                                         const folly::exception_wrapper &exception);
+
+  /**
    * Serialize a message in the delimited format.
    * Delimited format consists of the following:
    *