You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:02 UTC
[hbase] 74/133: HBASE-17727 [C++] Make RespConverter work with
RawAsyncTableImpl
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 608a8c227c680defbdcd63c8bf9cba1bc2e237e7
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Tue Mar 28 13:52:31 2017 -0700
HBASE-17727 [C++] Make RespConverter work with RawAsyncTableImpl
---
.../core/async-rpc-retrying-test.cc | 62 ++++++++++------------
hbase-native-client/core/raw-async-table.cc | 11 ++--
hbase-native-client/core/raw-async-table.h | 8 +--
hbase-native-client/core/response-converter.cc | 4 --
4 files changed, 38 insertions(+), 47 deletions(-)
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index d0c7921..5086286 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -137,15 +137,15 @@ class MockRawAsyncTableImpl {
/* in real RawAsyncTableImpl, this should be private. */
template <typename REQ, typename PREQ, typename PRESP, typename RESP>
- folly::Future<RESP> Call(
- std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
- std::shared_ptr<RegionLocation> loc, const REQ& req,
- const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
- const hbase::RpcCall<PREQ, PRESP>& rpc_call,
- const RespConverter<RESP, PRESP>& resp_converter) {
+ folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<RegionLocation> loc, const REQ& req,
+ ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
+ const hbase::RpcCall<PREQ, PRESP>& rpc_call,
+ RespConverter<RESP, PRESP> resp_converter) {
rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
- .then([&, this](std::unique_ptr<PRESP> presp) {
- std::shared_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp);
+ .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
+ RESP result = resp_converter(*presp);
promise_->setValue(result);
})
.onError([this](const std::exception& e) { promise_->setException(e); });
@@ -210,31 +210,27 @@ TEST(AsyncRpcRetryTest, TestGetBasic) {
auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
/* call with retry to get result */
- try {
- auto async_caller =
- builder->table(std::make_shared<TableName>(tn))
- ->row(row)
- ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
- ->operation_timeout(conn->connection_conf()->operation_timeout())
- ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc,
- std::shared_ptr<hbase::RpcClient> rpc_client)
- -> folly::Future<std::shared_ptr<hbase::Result>> {
- return tableImpl->GetCall(rpc_client, controller, loc, get);
- })
- ->Build();
-
- auto result = async_caller->Call().get();
-
- // Test the values, should be same as in put executed on hbase shell
- ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
- EXPECT_EQ("test2", result->Row());
- EXPECT_EQ("value2", *(result->Value("d", "2")));
- EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
- } catch (std::exception& e) {
- LOG(ERROR) << e.what();
- throw e;
- }
+
+ auto async_caller =
+ builder->table(std::make_shared<TableName>(tn))
+ ->row(row)
+ ->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
+ ->operation_timeout(conn->connection_conf()->operation_timeout())
+ ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client)
+ -> folly::Future<std::shared_ptr<hbase::Result>> {
+ return tableImpl->GetCall(rpc_client, controller, loc, get);
+ })
+ ->Build();
+
+ auto result = async_caller->Call().get();
+
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+ EXPECT_EQ("test2", result->Row());
+ EXPECT_EQ("value2", *(result->Value("d", "2")));
+ EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
table->Close();
client.Close();
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 641f3c8..88a3382 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -16,6 +16,7 @@
* limitations under the License.
*
*/
+#include <utility>
#include "core/raw-async-table.h"
#include "core/request-converter.h"
@@ -41,18 +42,16 @@ template <typename REQ, typename PREQ, typename PRESP, typename RESP>
folly::Future<RESP> RawAsyncTable::Call(
std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<RegionLocation> loc, const REQ& req,
- const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
- const RespConverter<RESP, PRESP>& resp_converter) {
+ const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
+ const RespConverter<RESP, PRESP> resp_converter) {
std::unique_ptr<PREQ> preq = req_converter(req, loc->region_name());
// No need to make take a callable argument, it is always the same
return rpc_client
->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
User::defaultUser(), "ClientService")
- .then([&](const std::unique_ptr<Response>& presp) {
- return ResponseConverter::FromGetResponse(*presp);
- // return resp_converter(*presp); // TODO this is causing SEGFAULT, figure out why
- });
+ .then(
+ [resp_converter](const std::unique_ptr<PRESP>& presp) { return resp_converter(*presp); });
}
Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) {
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index 527c7be..bbdc6bd 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -18,12 +18,12 @@
*/
#pragma once
+#include <folly/futures/Future.h>
+
#include <chrono>
#include <memory>
#include <string>
-#include <folly/futures/Future.h>
-
#include "core/async-connection.h"
#include "core/async-rpc-retrying-caller-factory.h"
#include "core/async-rpc-retrying-caller.h"
@@ -66,8 +66,8 @@ class RawAsyncTable {
folly::Future<RESP> Call(
std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<RegionLocation> loc, const REQ& req,
- const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter,
- const RespConverter<RESP, PRESP>& resp_converter);
+ const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
+ const RespConverter<RESP, PRESP> resp_converter);
template <typename RESP>
std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(std::string row,
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index b11856c..b2fff34 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -36,14 +36,12 @@ ResponseConverter::~ResponseConverter() {}
// impl note: we are returning shared_ptr's instead of unique_ptr's because these
// go inside folly::Future's, making the move semantics extremely tricky.
std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) {
- LOG(INFO) << "FromGetResponse";
auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg());
return ToResult(get_resp->result(), resp.cell_scanner());
}
std::shared_ptr<Result> ResponseConverter::ToResult(
const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
- LOG(INFO) << "ToResult";
std::vector<std::shared_ptr<Cell>> vcells;
for (auto cell : result.cell()) {
std::shared_ptr<Cell> pcell =
@@ -59,13 +57,11 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
}
// TODO: check associated cell count?
}
- LOG(INFO) << "Returning Result";
return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial());
}
std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
- LOG(INFO) << "FromScanResponse:" << scan_resp->ShortDebugString();
int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
: scan_resp->results_size();