You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:46:18 UTC
[hbase] 90/133: HBASE-17907 [C++] End to end Scans from Client/Table
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 74e0b08266247ffb7aa3bcbc59fbed2ac60ff1dc
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Tue Jun 6 15:16:09 2017 -0700
HBASE-17907 [C++] End to end Scans from Client/Table
---
hbase-native-client/bin/format-code.sh | 4 +-
hbase-native-client/connection/client-handler.cc | 5 +-
hbase-native-client/connection/request.h | 6 +
hbase-native-client/connection/response.h | 22 +-
hbase-native-client/connection/rpc-client.cc | 2 -
hbase-native-client/connection/rpc-client.h | 12 +-
hbase-native-client/core/BUCK | 41 +-
hbase-native-client/core/async-client-scanner.cc | 142 +++++++
hbase-native-client/core/async-client-scanner.h | 119 ++++++
.../core/async-rpc-retrying-caller-factory.h | 124 +++++-
.../core/async-rpc-retrying-caller.cc | 12 +-
.../core/async-rpc-retrying-caller.h | 2 -
.../core/async-scan-rpc-retrying-caller.cc | 447 +++++++++++++++++++++
.../core/async-scan-rpc-retrying-caller.h | 233 +++++++++++
.../core/async-table-result-scanner.cc | 161 ++++++++
.../core/async-table-result-scanner.h | 98 +++++
hbase-native-client/core/cell-test.cc | 24 ++
hbase-native-client/core/cell.cc | 21 +-
hbase-native-client/core/cell.h | 5 +-
hbase-native-client/core/client-test.cc | 13 +-
hbase-native-client/core/delete-test.cc | 2 +-
hbase-native-client/core/delete.cc | 46 +--
hbase-native-client/core/get.cc | 3 +-
.../core/hbase-configuration-loader.h | 5 +-
hbase-native-client/core/hbase-rpc-controller.h | 8 +
hbase-native-client/core/meta-utils.cc | 15 +-
hbase-native-client/core/query.h | 19 +-
hbase-native-client/core/raw-async-table.cc | 27 +-
hbase-native-client/core/raw-async-table.h | 21 +-
.../core/raw-scan-result-consumer.h | 131 ++++++
hbase-native-client/core/region-location.h | 2 +-
hbase-native-client/core/request-converter-test.cc | 3 +-
hbase-native-client/core/request-converter.cc | 96 ++++-
hbase-native-client/core/request-converter.h | 15 +-
hbase-native-client/core/response-converter.cc | 38 +-
hbase-native-client/core/response-converter.h | 5 +-
.../core/{query.h => result-scanner.h} | 28 +-
hbase-native-client/core/result-test.cc | 26 +-
hbase-native-client/core/result.cc | 60 +--
hbase-native-client/core/result.h | 18 +-
hbase-native-client/core/scan-result-cache-test.cc | 177 ++++++++
hbase-native-client/core/scan-result-cache.cc | 160 ++++++++
hbase-native-client/core/scan-result-cache.h | 80 ++++
hbase-native-client/core/scan-test.cc | 15 +-
hbase-native-client/core/scan.cc | 19 +-
hbase-native-client/core/scan.h | 13 +-
hbase-native-client/core/scanner-test.cc | 368 +++++++++++++++++
hbase-native-client/core/simple-client.cc | 85 ++--
hbase-native-client/core/table.cc | 16 +
hbase-native-client/core/table.h | 5 +
hbase-native-client/exceptions/BUCK | 10 +-
hbase-native-client/exceptions/exception.h | 192 +++++----
hbase-native-client/test-util/BUCK | 62 +--
hbase-native-client/test-util/mini-cluster.cc | 38 +-
hbase-native-client/test-util/mini-cluster.h | 6 +-
hbase-native-client/test-util/test-util.cc | 9 +
hbase-native-client/test-util/test-util.h | 3 +
hbase-native-client/utils/BUCK | 1 +
hbase-native-client/utils/bytes-util-test.cc | 11 +
hbase-native-client/utils/bytes-util.h | 24 +-
.../{core/query.h => utils/optional.h} | 19 +-
61 files changed, 3008 insertions(+), 366 deletions(-)
diff --git a/hbase-native-client/bin/format-code.sh b/hbase-native-client/bin/format-code.sh
index 8a19930..fe236d8 100755
--- a/hbase-native-client/bin/format-code.sh
+++ b/hbase-native-client/bin/format-code.sh
@@ -19,5 +19,5 @@ set -euo pipefail
IFS=$'\n\t'
-find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 clang-format -i --style='{BasedOnStyle: Google, ColumnLimit: 100}'
-find core connection serde utils third-party security -name "BUCK" | xargs -P8 yapf -i --style=google
+find core connection exceptions serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 clang-format -i --style='{BasedOnStyle: Google, ColumnLimit: 100}'
+find core connection exceptions serde utils test-util third-party security -name "BUCK" | xargs -P8 yapf -i --style=google
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index ac16197..894ecb3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -87,7 +87,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
if (cell_block_length > 0) {
auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
- received->set_cell_scanner(std::move(cell_scanner));
+ received->set_cell_scanner(std::shared_ptr<CellScanner>{cell_scanner.release()});
}
received->set_resp_msg(resp_msg);
@@ -129,8 +129,7 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re
ctx->fireWrite(std::move(header));
});
- VLOG(3) << "Writing RPC Request with call_id:"
- << r->call_id(); // TODO: more logging for RPC Header
+ VLOG(3) << "Writing RPC Request:" << r->DebugString();
// Now store the call id to response.
resp_msgs_->insert(r->call_id(), r->resp_msg());
diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h
index 520b380..4b652c0 100644
--- a/hbase-native-client/connection/request.h
+++ b/hbase-native-client/connection/request.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <folly/Conv.h>
#include <google/protobuf/message.h>
#include <cstdint>
@@ -64,6 +65,11 @@ class Request {
* method type to decode. */
std::string method() { return method_; }
+ std::string DebugString() {
+ return "call_id:" + folly::to<std::string>(call_id_) + ", req_msg:" +
+ req_msg_->ShortDebugString() + ", method:" + method_;
+ }
+
private:
uint32_t call_id_;
std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr;
diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h
index c5472b0..38fdda0 100644
--- a/hbase-native-client/connection/response.h
+++ b/hbase-native-client/connection/response.h
@@ -18,12 +18,14 @@
*/
#pragma once
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+
#include <cstdint>
#include <memory>
+#include <string>
#include <utility>
-#include <folly/ExceptionWrapper.h>
-
#include "serde/cell-scanner.h"
// Forward
@@ -66,20 +68,26 @@ class Response {
resp_msg_ = std::move(response);
}
- void set_cell_scanner(std::unique_ptr<CellScanner> cell_scanner) {
- cell_scanner_ = std::move(cell_scanner);
- }
+ void set_cell_scanner(std::shared_ptr<CellScanner> cell_scanner) { cell_scanner_ = cell_scanner; }
- const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; }
+ const std::shared_ptr<CellScanner> cell_scanner() const { return cell_scanner_; }
folly::exception_wrapper exception() { return exception_; }
void set_exception(folly::exception_wrapper value) { exception_ = value; }
+ std::string DebugString() const {
+ std::string s{"call_id:"};
+ s += folly::to<std::string>(call_id_);
+ s += ", resp_msg:";
+ s += resp_msg_->ShortDebugString();
+ return s;
+ }
+
private:
uint32_t call_id_;
std::shared_ptr<google::protobuf::Message> resp_msg_;
- std::unique_ptr<CellScanner> cell_scanner_;
+ std::shared_ptr<CellScanner> cell_scanner_;
folly::exception_wrapper exception_;
};
} // namespace hbase
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index fbbe3e7..10faa7a 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -22,8 +22,6 @@
#include <folly/Logging.h>
#include <unistd.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
-#include <memory>
-#include <string>
using hbase::security::User;
using std::chrono::nanoseconds;
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 8b0abe7..0ecde5b 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -18,17 +18,19 @@
*/
#pragma once
+#include <google/protobuf/service.h>
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <utility>
+
#include "connection/connection-id.h"
#include "connection/connection-pool.h"
#include "connection/request.h"
#include "connection/response.h"
#include "security/user.h"
-#include <google/protobuf/service.h>
-
-#include <chrono>
-#include <utility>
-
namespace hbase {
class RpcClient {
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 203e00d..81fd4a7 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -19,10 +19,12 @@
cxx_library(
name="core",
exported_headers=[
+ "async-client-scanner.h",
"async-connection.h",
"async-region-locator.h",
"async-rpc-retrying-caller-factory.h",
"async-rpc-retrying-caller.h",
+ "async-table-result-scanner.h",
"client.h",
"cell.h",
"hbase-macros.h",
@@ -42,10 +44,14 @@ cxx_library(
"delete.h",
"scan.h",
"result.h",
+ "result-scanner.h",
"request-converter.h",
"response-converter.h",
"table.h",
+ "async-scan-rpc-retrying-caller.h",
"raw-async-table.h",
+ "raw-scan-result-consumer.h",
+ "scan-result-cache.h",
"hbase-rpc-controller.h",
"time-range.h",
"zk-util.h",
@@ -58,9 +64,12 @@ cxx_library(
"async-batch-rpc-retrying-caller.h",
],
srcs=[
+ "async-client-scanner.cc",
"async-connection.cc",
"async-rpc-retrying-caller-factory.cc",
"async-rpc-retrying-caller.cc",
+ "async-scan-rpc-retrying-caller.cc",
+ "async-table-result-scanner.cc",
"cell.cc",
"client.cc",
"hbase-rpc-controller.cc",
@@ -73,6 +82,7 @@ cxx_library(
"put.cc",
"delete.cc",
"scan.cc",
+ "scan-result-cache.cc",
"raw-async-table.cc",
"result.cc",
"request-converter.cc",
@@ -109,7 +119,7 @@ cxx_library(
"configuration.cc",
"hbase-configuration-loader.cc",
],
- deps=["//third-party:folly"],
+ deps=["//utils:utils", "//third-party:folly"],
compiler_flags=['-Weffc++', '-ggdb'],
visibility=[
'PUBLIC',
@@ -156,8 +166,12 @@ cxx_test(
run_test_separately=True,)
cxx_test(
name="delete-test",
- srcs=["delete-test.cc",],
- deps=[":core",],
+ srcs=[
+ "delete-test.cc",
+ ],
+ deps=[
+ ":core",
+ ],
run_test_separately=True,)
cxx_test(
name="increment-test",
@@ -257,6 +271,27 @@ cxx_test(
],
run_test_separately=True,)
cxx_test(
+ name="scan-result-cache-test",
+ srcs=[
+ "scan-result-cache-test.cc",
+ ],
+ deps=[
+ ":core",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="scanner-test",
+ srcs=[
+ "scanner-test.cc",
+ ],
+ deps=[
+ ":core",
+ "//if:if",
+ "//serde:serde",
+ "//test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
name="zk-util-test",
srcs=[
"zk-util-test.cc",
diff --git a/hbase-native-client/core/async-client-scanner.cc b/hbase-native-client/core/async-client-scanner.cc
new file mode 100644
index 0000000..720ab25
--- /dev/null
+++ b/hbase-native-client/core/async-client-scanner.cc
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-client-scanner.h"
+
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+
+AsyncClientScanner::AsyncClientScanner(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
+ std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer,
+ nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ : conn_(conn),
+ scan_(scan),
+ table_name_(table_name),
+ consumer_(consumer),
+ pause_(pause),
+ max_retries_(max_retries),
+ scan_timeout_nanos_(scan_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count) {
+ results_cache_ = std::make_shared<ScanResultCache>();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+}
+
+void AsyncClientScanner::Start() { OpenScanner(); }
+
+folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner(
+ std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc) {
+ open_scanner_tries_++;
+
+ auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false);
+
+ auto self(shared_from_this());
+ VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString();
+ return rpc_client
+ ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq),
+ security::User::defaultUser(), "ClientService")
+ .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) {
+ VLOG(5) << "Scan Response:" << presp->DebugString();
+ return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller);
+ });
+}
+
+void AsyncClientScanner::OpenScanner() {
+ auto self(shared_from_this());
+ open_scanner_tries_ = 1;
+
+ auto caller = conn_->caller_factory()
+ ->Single<std::shared_ptr<OpenScannerResponse>>()
+ ->table(table_name_)
+ ->row(scan_->StartRow())
+ ->locate_type(GetLocateType(*scan_))
+ ->rpc_timeout(rpc_timeout_nanos_)
+ ->operation_timeout(scan_timeout_nanos_)
+ ->pause(pause_)
+ ->max_retries(max_retries_)
+ ->start_log_errors_count(start_log_errors_count_)
+ ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client)
+ -> folly::Future<std::shared_ptr<OpenScannerResponse>> {
+ return CallOpenScanner(rpc_client, controller, loc);
+ })
+ ->Build();
+
+ caller->Call()
+ .then([this, self](std::shared_ptr<OpenScannerResponse> resp) {
+ VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id()
+ << ", region:" << resp->region_location_->DebugString() << ", starting scan";
+ StartScan(resp);
+ })
+ .onError([this, self](const folly::exception_wrapper& e) {
+ VLOG(3) << "Open scan request received error:" << e.what();
+ consumer_->OnError(e);
+ })
+ .then([caller, self](const auto r) { return r; });
+}
+
+void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) {
+ auto self(shared_from_this());
+ auto caller = conn_->caller_factory()
+ ->Scan()
+ ->scanner_id(resp->scan_resp_->scanner_id())
+ ->region_location(resp->region_location_)
+ ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl()))
+ ->scan(scan_)
+ ->rpc_client(resp->rpc_client_)
+ ->consumer(consumer_)
+ ->results_cache(results_cache_)
+ ->rpc_timeout(rpc_timeout_nanos_)
+ ->scan_timeout(scan_timeout_nanos_)
+ ->pause(pause_)
+ ->max_retries(max_retries_)
+ ->start_log_errors_count(start_log_errors_count_)
+ ->Build();
+
+ caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_)
+ .then([caller, self](const bool has_more) {
+ if (has_more) {
+ // open the next scanner on the next region.
+ self->OpenScanner();
+ } else {
+ self->consumer_->OnComplete();
+ }
+ })
+ .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); })
+ .then([caller, self](const auto r) { return r; });
+}
+
+RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) {
+ // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow().
+ // When added, this method should be modified to return other RegionLocateTypes
+ // (see ConnectionUtils.java #getLocateType())
+ // TODO: When reversed scans are implemented, return other RegionLocateTypes
+ return RegionLocateType::kCurrent;
+}
+
+} // namespace hbase
diff --git a/hbase-native-client/core/async-client-scanner.h b/hbase-native-client/core/async-client-scanner.h
new file mode 100644
index 0000000..8663468
--- /dev/null
+++ b/hbase-native-client/core/async-client-scanner.h
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/async-connection.h"
+#include "core/async-rpc-retrying-caller-factory.h"
+#include "core/async-rpc-retrying-caller.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/raw-scan-result-consumer.h"
+#include "core/region-location.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+#include "core/scan.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "utils/connection-util.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+class OpenScannerResponse {
+ public:
+ OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client,
+ const std::unique_ptr<Response>& resp,
+ std::shared_ptr<RegionLocation> region_location,
+ std::shared_ptr<hbase::HBaseRpcController> controller)
+ : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) {
+ scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
+ cell_scanner_ = resp->cell_scanner();
+ }
+ std::shared_ptr<hbase::RpcClient> rpc_client_;
+ std::shared_ptr<pb::ScanResponse> scan_resp_;
+ std::shared_ptr<RegionLocation> region_location_;
+ std::shared_ptr<hbase::HBaseRpcController> controller_;
+ std::shared_ptr<CellScanner> cell_scanner_;
+};
+
+class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> {
+ public:
+ template <typename... T>
+ static std::shared_ptr<AsyncClientScanner> Create(T&&... all) {
+ return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...));
+ }
+
+ void Start();
+
+ private:
+ // methods
+ AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan,
+ std::shared_ptr<pb::TableName> table_name,
+ std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause,
+ uint32_t max_retries, nanoseconds scan_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
+
+ folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner(
+ std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc);
+
+ void OpenScanner();
+
+ void StartScan(std::shared_ptr<OpenScannerResponse> resp);
+
+ RegionLocateType GetLocateType(const Scan& scan);
+
+ private:
+ // data
+ std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<Scan> scan_;
+ std::shared_ptr<pb::TableName> table_name_;
+ std::shared_ptr<ScanResultCache> results_cache_;
+ std::shared_ptr<RawScanResultConsumer> consumer_;
+ nanoseconds pause_;
+ uint32_t max_retries_;
+ nanoseconds scan_timeout_nanos_;
+ nanoseconds rpc_timeout_nanos_;
+ uint32_t start_log_errors_count_;
+ uint32_t max_attempts_;
+ uint32_t open_scanner_tries_ = 0;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
index 3c11f52..1af6e72 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h
@@ -28,7 +28,13 @@
#include "connection/rpc-client.h"
#include "core/async-batch-rpc-retrying-caller.h"
#include "core/async-rpc-retrying-caller.h"
+#include "core/async-scan-rpc-retrying-caller.h"
+#include "core/raw-scan-result-consumer.h"
+#include "core/region-location.h"
#include "core/row.h"
+#include "core/scan-result-cache.h"
+#include "core/scan.h"
+
#include "if/Client.pb.h"
#include "if/HBase.pb.h"
@@ -54,8 +60,8 @@ class SingleRequestCallerBuilder
virtual ~SingleRequestCallerBuilder() = default;
- typedef SingleRequestCallerBuilder<RESP> GenenericThisType;
- typedef std::shared_ptr<GenenericThisType> SharedThisPtr;
+ typedef SingleRequestCallerBuilder<RESP> GenericThisType;
+ typedef std::shared_ptr<GenericThisType> SharedThisPtr;
SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) {
table_name_ = table_name;
@@ -112,7 +118,7 @@ class SingleRequestCallerBuilder
private:
SharedThisPtr shared_this() {
- return std::enable_shared_from_this<GenenericThisType>::shared_from_this();
+ return std::enable_shared_from_this<GenericThisType>::shared_from_this();
}
private:
@@ -198,6 +204,114 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde
std::chrono::nanoseconds rpc_timeout_nanos_;
int32_t start_log_errors_count_ = 0;
};
+
+class ScanCallerBuilder : public std::enable_shared_from_this<ScanCallerBuilder> {
+ public:
+ explicit ScanCallerBuilder(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer)
+ : conn_(conn),
+ retry_timer_(retry_timer),
+ rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()),
+ pause_(conn->connection_conf()->pause()),
+ scan_timeout_nanos_(conn->connection_conf()->scan_timeout()),
+ max_retries_(conn->connection_conf()->max_retries()),
+ start_log_errors_count_(conn->connection_conf()->start_log_errors_count()),
+ scanner_id_(-1) {}
+
+ virtual ~ScanCallerBuilder() = default;
+
+ typedef ScanCallerBuilder GenericThisType;
+ typedef std::shared_ptr<ScanCallerBuilder> SharedThisPtr;
+
+ SharedThisPtr rpc_client(std::shared_ptr<hbase::RpcClient> rpc_client) {
+ rpc_client_ = rpc_client;
+ return shared_this();
+ }
+
+ SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) {
+ rpc_timeout_nanos_ = rpc_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) {
+ scan_timeout_nanos_ = scan_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) {
+ scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos;
+ return shared_this();
+ }
+
+ SharedThisPtr pause(nanoseconds pause) {
+ pause_ = pause;
+ return shared_this();
+ }
+
+ SharedThisPtr max_retries(uint32_t max_retries) {
+ max_retries_ = max_retries;
+ return shared_this();
+ }
+
+ SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) {
+ start_log_errors_count_ = start_log_errors_count;
+ return shared_this();
+ }
+
+ SharedThisPtr region_location(std::shared_ptr<RegionLocation> region_location) {
+ region_location_ = region_location;
+ return shared_this();
+ }
+
+ SharedThisPtr scanner_id(int64_t scanner_id) {
+ scanner_id_ = scanner_id;
+ return shared_this();
+ }
+
+ SharedThisPtr scan(std::shared_ptr<Scan> scan) {
+ scan_ = scan;
+ return shared_this();
+ }
+
+ SharedThisPtr results_cache(std::shared_ptr<ScanResultCache> results_cache) {
+ results_cache_ = results_cache;
+ return shared_this();
+ }
+
+ SharedThisPtr consumer(std::shared_ptr<RawScanResultConsumer> consumer) {
+ consumer_ = consumer;
+ return shared_this();
+ }
+
+ std::shared_ptr<AsyncScanRpcRetryingCaller> Build() {
+ return std::make_shared<AsyncScanRpcRetryingCaller>(
+ conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_,
+ region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_,
+ rpc_timeout_nanos_, start_log_errors_count_);
+ }
+
+ private:
+ SharedThisPtr shared_this() {
+ return std::enable_shared_from_this<GenericThisType>::shared_from_this();
+ }
+
+ private:
+ std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<hbase::RpcClient> rpc_client_;
+ std::shared_ptr<Scan> scan_;
+ nanoseconds rpc_timeout_nanos_;
+ nanoseconds scan_timeout_nanos_;
+ nanoseconds scanner_lease_timeout_nanos_;
+ nanoseconds pause_;
+ uint32_t max_retries_;
+ uint32_t start_log_errors_count_;
+ std::shared_ptr<RegionLocation> region_location_;
+ int64_t scanner_id_;
+ std::shared_ptr<RawScanResultConsumer> consumer_;
+ std::shared_ptr<ScanResultCache> results_cache_;
+}; // end of ScanCallerBuilder
+
class AsyncRpcRetryingCallerFactory {
private:
std::shared_ptr<AsyncConnection> conn_;
@@ -218,6 +332,10 @@ class AsyncRpcRetryingCallerFactory {
std::shared_ptr<BatchCallerBuilder> Batch() {
return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_);
}
+
+ std::shared_ptr<ScanCallerBuilder> Scan() {
+ return std::make_shared<ScanCallerBuilder>(conn_, retry_timer_);
+ }
};
} // namespace hbase
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index a2c5f0e..b9d01bb 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -111,7 +111,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
Consumer<exception_wrapper> update_cached_location) {
ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
exceptions_->push_back(twec);
- if (!ShouldRetry(error) || tries_ >= max_retries_) {
+ if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) {
CompleteExceptionally();
return;
}
@@ -149,14 +149,6 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError(
}
template <typename RESP>
-bool AsyncSingleRequestRpcRetryingCaller<RESP>::ShouldRetry(const exception_wrapper& error) {
- bool do_not_retry = false;
- error.with_exception(
- [&](const RemoteException& remote_ex) { do_not_retry &= remote_ex.do_not_retry(); });
- return !do_not_retry;
-}
-
-template <typename RESP>
void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) {
int64_t call_timeout_ns;
if (operation_timeout_nanos_.count() > 0) {
@@ -220,6 +212,8 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController(
// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the
// templetized
// class definitions.
+class OpenScannerResponse;
template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
+template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
} /* namespace hbase */
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h
index fa3c288..c7e28d0 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.h
+++ b/hbase-native-client/core/async-rpc-retrying-caller.h
@@ -84,8 +84,6 @@ class AsyncSingleRequestRpcRetryingCaller {
void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg,
Consumer<folly::exception_wrapper> update_cached_location);
- bool ShouldRetry(const folly::exception_wrapper& error);
-
void Call(const RegionLocation& loc);
void CompleteExceptionally();
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
new file mode 100644
index 0000000..fbdf17a
--- /dev/null
+++ b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-scan-rpc-retrying-caller.h"
+
+namespace hbase {
+
+ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+ : caller_(caller), mutex_() {}
+
+void ScanResumerImpl::Resume() {
+ // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
+ // just return at the first if condition without loading the resp and numValidResuls field. If
+ // resume is called after suspend, then it is also safe to just reference resp and
+ // numValidResults after the synchronized block as no one will change it anymore.
+ std::shared_ptr<pb::ScanResponse> local_resp;
+ int64_t local_num_complete_rows;
+
+ {
+ std::unique_lock<std::mutex> mlock{mutex_};
+ if (state_ == ScanResumerState::kInitialized) {
+ // user calls this method before we call prepare, so just set the state to
+ // RESUMED, the implementation will just go on.
+ state_ = ScanResumerState::kResumed;
+ return;
+ }
+ if (state_ == ScanResumerState::kResumed) {
+ // already resumed, give up.
+ return;
+ }
+ state_ = ScanResumerState::kResumed;
+ local_resp = resp_;
+ local_num_complete_rows = num_complete_rows_;
+ }
+
+ caller_->CompleteOrNext(local_resp);
+}
+
+bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ if (state_ == ScanResumerState::kResumed) {
+ // user calls resume before we actually suspend the scan, just continue;
+ return false;
+ }
+ state_ = ScanResumerState::kSuspended;
+ resp_ = resp;
+ num_complete_rows_ = num_complete_rows;
+
+ return true;
+}
+
+ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller)
+ : caller_(caller) {}
+
+std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() {
+ PreCheck();
+ state_ = ScanControllerState::kSuspended;
+ resumer_ = std::make_shared<ScanResumerImpl>(caller_);
+ return resumer_;
+}
+
+void ScanControllerImpl::Terminate() {
+ PreCheck();
+ state_ = ScanControllerState::kTerminated;
+}
+
+// return the current state, and set the state to DESTROYED.
+ScanControllerState ScanControllerImpl::Destroy() {
+ ScanControllerState state = state_;
+ state_ = ScanControllerState::kDestroyed;
+ return state;
+}
+
+void ScanControllerImpl::PreCheck() {
+ CHECK(std::this_thread::get_id() == caller_thread_id_)
+ << "The current thread is" << std::this_thread::get_id() << ", expected thread is "
+ << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat";
+
+ CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state "
+ << DebugString(state_);
+}
+
+std::string ScanControllerImpl::DebugString(ScanControllerState state) {
+ switch (state) {
+ case ScanControllerState::kInitialized:
+ return "kInitialized";
+ case ScanControllerState::kSuspended:
+ return "kSuspended";
+ case ScanControllerState::kTerminated:
+ return "kTerminated";
+ case ScanControllerState::kDestroyed:
+ return "kDestroyed";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+std::string ScanControllerImpl::DebugString(ScanResumerState state) {
+ switch (state) {
+ case ScanResumerState::kInitialized:
+ return "kInitialized";
+ case ScanResumerState::kSuspended:
+ return "kSuspended";
+ case ScanResumerState::kResumed:
+ return "kResumed";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id,
+ std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer,
+ std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos,
+ nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count)
+ : conn_(conn),
+ retry_timer_(retry_timer),
+ rpc_client_(rpc_client),
+ scan_(scan),
+ scanner_id_(scanner_id),
+ results_cache_(results_cache),
+ consumer_(consumer),
+ region_location_(region_location),
+ scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos),
+ pause_(pause),
+ max_retries_(max_retries),
+ scan_timeout_nanos_(scan_timeout_nanos),
+ rpc_timeout_nanos_(rpc_timeout_nanos),
+ start_log_errors_count_(start_log_errors_count),
+ promise_(std::make_shared<folly::Promise<bool>>()),
+ tries_(1) {
+ controller_ = conn_->CreateRpcController();
+ start_ns_ = TimeUtil::GetNowNanos();
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries);
+ exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+}
+
+folly::Future<bool> AsyncScanRpcRetryingCaller::Start(
+ std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<pb::ScanResponse> open_scan_resp,
+ const std::shared_ptr<CellScanner> cell_scanner) {
+ OnComplete(controller, open_scan_resp, cell_scanner);
+ return promise_->getFuture();
+}
+
+int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() {
+ return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<pb::ScanResponse> resp,
+ const std::shared_ptr<CellScanner> cell_scanner) {
+ VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
+
+ if (controller->Failed()) {
+ OnError(controller->exception());
+ return;
+ }
+
+ bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
+
+ int64_t num_complete_rows_before = results_cache_->num_complete_rows();
+ try {
+ auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
+
+ auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
+
+ auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
+
+ if (results.size() > 0) {
+ UpdateNextStartRowWhenError(*results[results.size() - 1]);
+ VLOG(5) << "Calling consumer->OnNext()";
+ consumer_->OnNext(results, scan_controller);
+ } else if (is_heartbeat) {
+ consumer_->OnHeartbeat(scan_controller);
+ }
+
+ ScanControllerState state = scan_controller->Destroy();
+ if (state == ScanControllerState::kTerminated) {
+ if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+ // we have more results in region but user request to stop the scan, so we need to close the
+ // scanner explicitly.
+ CloseScanner();
+ }
+ CompleteNoMoreResults();
+ return;
+ }
+
+ int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
+ if (state == ScanControllerState::kSuspended) {
+ if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
+ return;
+ }
+ }
+ } catch (const std::runtime_error& e) {
+ // We can not retry here. The server has responded normally and the call sequence has been
+ // increased so a new scan with the same call sequence will cause an
+ // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
+ LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
+ CompleteWhenError(true);
+ return;
+ }
+
+ CompleteOrNext(resp);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) {
+ VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_
+ << ", response:" << resp->ShortDebugString();
+
+ if (resp->has_more_results() && !resp->more_results()) {
+ // RS tells us there is no more data for the whole scan
+ CompleteNoMoreResults();
+ return;
+ }
+ // TODO: Implement Scan::limit(), and check the limit here
+
+ if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
+ // TODO: check whether Scan is reversed here
+ CompleteWhenNoMoreResultsInRegion();
+ return;
+ }
+ Next();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) {
+ VLOG(5) << "Scan: CompleteExceptionally";
+ results_cache_->Clear();
+ if (close_scanner) {
+ CloseScanner();
+ }
+ this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_));
+}
+
+void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() {
+ // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+ // in branch-1 code. If this is backported, make sure that the scanner is closed.
+ VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_;
+ promise_->setValue(false);
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() {
+ VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_;
+ // In master code, scanners auto-close if we have exhausted the region. It may not be the case
+ // in branch-1 code. If this is backported, make sure that the scanner is closed.
+ if (NoMoreResultsForScan(*scan_, region_location_->region_info())) {
+ CompleteNoMoreResults();
+ } else {
+ CompleteWithNextStartRow(region_location_->region_info().end_key(), true);
+ }
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) {
+ VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region";
+ scan_->SetStartRow(row);
+ // TODO: set inclusive if it is reverse scans
+ promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) {
+ next_start_row_when_error_ = optional<std::string>(result.Row());
+ include_next_start_row_when_error_ = result.Partial();
+}
+
+void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) {
+ VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_;
+ results_cache_->Clear();
+ if (close_scanner) {
+ CloseScanner();
+ }
+ if (next_start_row_when_error_) {
+ // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement
+ // those options in Scan , we can start using that here.
+ scan_->SetStartRow(include_next_start_row_when_error_
+ ? *next_start_row_when_error_
+ : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_));
+ }
+ promise_->setValue(true);
+}
+
+void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) {
+ VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_;
+ if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) {
+ LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString()
+ << " for scanner id = " << scanner_id_ << " for "
+ << region_location_->region_info().ShortDebugString()
+ << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_
+ << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count()
+ << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms"
+ << error.what().toStdString();
+ }
+
+ bool scanner_closed = ExceptionUtil::IsScannerClosed(error);
+ ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos());
+ exceptions_->push_back(twec);
+ if (tries_ >= max_retries_) {
+ CompleteExceptionally(!scanner_closed);
+ return;
+ }
+
+ int64_t delay_ns;
+ if (scan_timeout_nanos_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ CompleteExceptionally(!scanner_closed);
+ return;
+ }
+ delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1);
+ }
+
+ if (scanner_closed) {
+ CompleteWhenError(false);
+ return;
+ }
+
+ if (ExceptionUtil::IsScannerOutOfOrder(error)) {
+ CompleteWhenError(true);
+ return;
+ }
+ if (!ExceptionUtil::ShouldRetry(error)) {
+ CompleteExceptionally(true);
+ return;
+ }
+ tries_++;
+
+ auto self(shared_from_this());
+ conn_->retry_executor()->add([&]() {
+ retry_timer_->scheduleTimeoutFn(
+ [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); },
+ std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns)));
+ });
+}
+
+bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan,
+ const pb::RegionInfo& info) {
+ if (BytesUtil::IsEmptyStopRow(info.end_key())) {
+ return true;
+ }
+ if (BytesUtil::IsEmptyStopRow(scan.StopRow())) {
+ return false;
+ }
+ int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow());
+ // 1. if our stop row is less than the endKey of the region
+ // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
+ // for scan.
+ return c > 0 ||
+ (c == 0 /* && !scan.IncludeStopRow()*/); // TODO: Scans always exclude StopRow for now.
+}
+
+void AsyncScanRpcRetryingCaller::Next() {
+ VLOG(5) << "Scan: Next";
+ next_call_seq_++;
+ tries_ = 1;
+ exceptions_->clear();
+ start_ns_ = TimeUtil::GetNowNanos();
+ Call();
+}
+
+void AsyncScanRpcRetryingCaller::Call() {
+ VLOG(5) << "Scan: Call";
+ auto self(shared_from_this());
+ // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+ // less than the scan timeout. If the server does not respond in time(usually this will not
+ // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+ // resending the next request and the only way to fix this is to close the scanner and open a
+ // new one.
+ int64_t call_timeout_nanos;
+ if (scan_timeout_nanos_.count() > 0) {
+ int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+ if (remaining_nanos <= 0) {
+ CompleteExceptionally(true);
+ return;
+ }
+ call_timeout_nanos = remaining_nanos;
+ } else {
+ call_timeout_nanos = 0L;
+ }
+
+ ResetController(controller_, call_timeout_nanos);
+
+ auto req =
+ RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false);
+
+ // do the RPC call
+ rpc_client_
+ ->AsyncCall(region_location_->server_name().host_name(),
+ region_location_->server_name().port(), std::move(req),
+ security::User::defaultUser(), "ClientService")
+ .then([self, this](const std::unique_ptr<Response>& resp) {
+ auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg());
+ return OnComplete(controller_, scan_resp, resp->cell_scanner());
+ })
+ .onError([self, this](const folly::exception_wrapper& e) { OnError(e); });
+}
+
+void AsyncScanRpcRetryingCaller::CloseScanner() {
+ auto self(shared_from_this());
+ ResetController(controller_, rpc_timeout_nanos_.count());
+
+ VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_);
+
+ // Do a close scanner RPC. Fire and forget.
+ auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true);
+ rpc_client_
+ ->AsyncCall(region_location_->server_name().host_name(),
+ region_location_->server_name().port(), std::move(req),
+ security::User::defaultUser(), "ClientService")
+ .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> {
+ LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() +
+ " for closing scanner_id = " + folly::to<std::string>(scanner_id_) +
+ " for " + region_location_->region_info().ShortDebugString() +
+ " failed, ignore, probably already closed. Exception:" +
+ e.what().toStdString();
+ return nullptr;
+ });
+}
+
+void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller,
+ const int64_t& timeout_nanos) {
+ controller->Reset();
+ if (timeout_nanos >= 0) {
+ controller->set_call_timeout(
+ milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos))));
+ }
+}
+
+} // namespace hbase
diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.h b/hbase-native-client/core/async-scan-rpc-retrying-caller.h
new file mode 100644
index 0000000..9555e80
--- /dev/null
+++ b/hbase-native-client/core/async-scan-rpc-retrying-caller.h
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/HHWheelTimer.h>
+
+#include <algorithm>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/async-connection.h"
+#include "core/hbase-rpc-controller.h"
+#include "core/raw-scan-result-consumer.h"
+#include "core/region-location.h"
+#include "core/request-converter.h"
+#include "core/response-converter.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+#include "core/scan.h"
+#include "exceptions/exception.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+#include "utils/bytes-util.h"
+#include "utils/connection-util.h"
+#include "utils/optional.h"
+#include "utils/sys-util.h"
+#include "utils/time-util.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+class AsyncScanRpcRetryingCaller;
+
+// The resume method is allowed to be called in another thread so here we also use the
+// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
+// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
+// and when user calls resume method, we will change the state to RESUMED. But the resume method
+// could be called in other thread, and in fact, user could just do this:
+// controller.suspend().resume()
+// This is strange but valid. This means the scan could be resumed before we call the prepare
+// method to do the actual suspend work. So in the resume method, we will check if the state is
+// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
+// method, if the state is RESUMED already, we will just return an let the scan go on.
+// Notice that, the public methods of this class is supposed to be called by upper layer only, and
+// package private methods can only be called within the implementation of
+// AsyncScanSingleRegionRpcRetryingCaller.
+// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread.
+// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the
+// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The
+// application is expected to consume the scan results before the scanner lease timeout.
+class ScanResumerImpl : public ScanResumer {
+ public:
+ explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
+
+ virtual ~ScanResumerImpl() = default;
+
+ /**
+ * Resume the scan. You are free to call it multiple time but only the first call will take
+ * effect.
+ */
+ void Resume() override;
+
+ // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
+ // for more details.
+ bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows);
+
+ private:
+ // INITIALIZED -> SUSPENDED -> RESUMED
+ // INITIALIZED -> RESUMED
+ ScanResumerState state_ = ScanResumerState::kInitialized;
+ std::mutex mutex_;
+ std::shared_ptr<pb::ScanResponse> resp_ = nullptr;
+ int64_t num_complete_rows_ = 0;
+ std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
+};
+
+class ScanControllerImpl : public ScanController {
+ public:
+ virtual ~ScanControllerImpl() = default;
+
+ explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller);
+
+ /**
+ * Suspend the scan.
+ * <p>
+ * This means we will stop fetching data in background, i.e., will not call onNext any more
+ * before you resume the scan.
+ * @return A resumer used to resume the scan later.
+ */
+ std::shared_ptr<ScanResumer> Suspend();
+
+ /**
+ * Terminate the scan.
+ * <p>
+ * This is useful when you have got enough results and want to stop the scan in onNext method,
+ * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+ */
+ void Terminate();
+
+ // return the current state, and set the state to DESTROYED.
+ ScanControllerState Destroy();
+
+ std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; }
+
+ private:
+ void PreCheck();
+
+ std::string DebugString(ScanControllerState state);
+
+ std::string DebugString(ScanResumerState state);
+
+ private:
+ // Make sure the methods are only called in this thread.
+ std::thread::id caller_thread_id_ = std::this_thread::get_id();
+ // INITIALIZED -> SUSPENDED -> DESTROYED
+ // INITIALIZED -> TERMINATED -> DESTROYED
+ // INITIALIZED -> DESTROYED
+ // If the state is incorrect we will throw IllegalStateException.
+ ScanControllerState state_ = ScanControllerState::kInitialized;
+ std::shared_ptr<ScanResumerImpl> resumer_ = nullptr;
+ std::shared_ptr<AsyncScanRpcRetryingCaller> caller_;
+};
+
+class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> {
+ public:
+ AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn,
+ std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<hbase::RpcClient> rpc_client,
+ std::shared_ptr<Scan> scan, int64_t scanner_id,
+ std::shared_ptr<ScanResultCache> results_cache,
+ std::shared_ptr<RawScanResultConsumer> consumer,
+ std::shared_ptr<RegionLocation> region_location,
+ nanoseconds scanner_lease_timeout_nanos, nanoseconds pause,
+ uint32_t max_retries, nanoseconds scan_timeout_nanos,
+ nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count);
+
+ folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<pb::ScanResponse> open_scan_resp,
+ const std::shared_ptr<CellScanner> cell_scanner);
+
+ private:
+ int64_t RemainingTimeNs();
+ void OnComplete(std::shared_ptr<HBaseRpcController> controller,
+ std::shared_ptr<pb::ScanResponse> resp,
+ const std::shared_ptr<CellScanner> cell_scanner);
+
+ void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp);
+
+ void CompleteExceptionally(bool close_scanner);
+
+ void CompleteNoMoreResults();
+
+ void CompleteWhenNoMoreResultsInRegion();
+
+ void CompleteWithNextStartRow(std::string row, bool inclusive);
+
+ void UpdateNextStartRowWhenError(const Result& result);
+
+ void CompleteWhenError(bool close_scanner);
+
+ void OnError(const folly::exception_wrapper& e);
+
+ bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info);
+
+ void Next();
+
+ void Call();
+
+ void CloseScanner();
+
+ void ResetController(std::shared_ptr<HBaseRpcController> controller,
+ const int64_t& timeout_nanos);
+
+ private:
+ std::shared_ptr<AsyncConnection> conn_;
+ std::shared_ptr<folly::HHWheelTimer> retry_timer_;
+ std::shared_ptr<hbase::RpcClient> rpc_client_;
+ std::shared_ptr<Scan> scan_;
+ int64_t scanner_id_;
+ std::shared_ptr<ScanResultCache> results_cache_;
+ std::shared_ptr<RawScanResultConsumer> consumer_;
+ std::shared_ptr<RegionLocation> region_location_;
+ nanoseconds scanner_lease_timeout_nanos_;
+ nanoseconds pause_;
+ uint32_t max_retries_;
+ nanoseconds scan_timeout_nanos_;
+ nanoseconds rpc_timeout_nanos_;
+ uint32_t start_log_errors_count_;
+ std::shared_ptr<folly::Promise<bool>> promise_;
+ std::shared_ptr<HBaseRpcController> controller_;
+ optional<std::string> next_start_row_when_error_ = optional<std::string>();
+ bool include_next_start_row_when_error_ = true;
+ uint64_t start_ns_;
+ uint32_t tries_;
+ std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_;
+ uint32_t max_attempts_;
+ int64_t next_call_seq_ = -1L;
+
+ friend class ScanResumerImpl;
+ friend class ScanControllerImpl;
+};
+
+} // namespace hbase
diff --git a/hbase-native-client/core/async-table-result-scanner.cc b/hbase-native-client/core/async-table-result-scanner.cc
new file mode 100644
index 0000000..b1935ae
--- /dev/null
+++ b/hbase-native-client/core/async-table-result-scanner.cc
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/async-table-result-scanner.h"
+
+#include <vector>
+
+namespace hbase {
+AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size)
+ : max_cache_size_(max_cache_size) {
+ closed_ = false;
+ cache_size_ = 0;
+}
+
+AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); }
+
+void AsyncTableResultScanner::Close() {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ closed_ = true;
+ while (!queue_.empty()) {
+ queue_.pop();
+ }
+ cache_size_ = 0;
+ if (resumer_ != nullptr) {
+ resumer_->Resume();
+ }
+ cond_.notify_all();
+}
+
+std::shared_ptr<Result> AsyncTableResultScanner::Next() {
+ VLOG(5) << "AsyncTableResultScanner: Next()";
+
+ std::shared_ptr<Result> result = nullptr;
+ std::shared_ptr<ScanResumer> local_resumer = nullptr;
+ {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ while (queue_.empty()) {
+ if (closed_) {
+ return nullptr;
+ }
+ if (error_) {
+ throw error_;
+ }
+ cond_.wait(mlock);
+ }
+ result = queue_.front();
+ queue_.pop();
+
+ cache_size_ -= EstimatedSizeWithSharedPtr(result);
+ if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) {
+ VLOG(1) << std::this_thread::get_id() << " resume scan prefetching";
+ local_resumer = resumer_;
+ resumer_ = nullptr;
+ }
+ }
+
+ // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that
+ // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC
+ // in the same event thread before returning from the previous call. This seems like the
+ // wrong thing to do(â„¢), but we cannot fix that now. Since the call back can end up calling
+ // this::OnNext(), we should unlock the mutex.
+ if (local_resumer != nullptr) {
+ local_resumer->Resume();
+ }
+ return result;
+}
+
+void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) {
+ VLOG(5) << "AsyncTableResultScanner: AddToCache()";
+ for (const auto r : results) {
+ queue_.push(r);
+ cache_size_ += EstimatedSizeWithSharedPtr(r);
+ }
+}
+
+template <typename T>
+inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) {
+ return t->EstimatedSize() + sizeof(std::shared_ptr<T>);
+}
+
+void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results,
+ std::shared_ptr<ScanController> controller) {
+ VLOG(5) << "AsyncTableResultScanner: OnNext()";
+ {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ if (closed_) {
+ controller->Terminate();
+ return;
+ }
+ AddToCache(results);
+
+ if (cache_size_ >= max_cache_size_) {
+ StopPrefetch(controller);
+ }
+ }
+ cond_.notify_all();
+}
+
+void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) {
+ VLOG(1) << std::this_thread::get_id()
+ << ": stop prefetching when scanning as the cache size " +
+ folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " +
+ folly::to<std::string>(max_cache_size_);
+
+ resumer_ = controller->Suspend();
+ num_prefetch_stopped_++;
+}
+
+/**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within the scope of onHeartbeat method. You can only call its
+ * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ if (closed_) {
+ controller->Terminate();
+ }
+}
+
+/**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) {
+ LOG(WARNING) << "Scanner received error" << error.what();
+ std::unique_lock<std::mutex> mlock(mutex_);
+ error_ = error;
+ cond_.notify_all();
+}
+
+/**
+ * Indicate that the scan operation is completed normally.
+ */
+void AsyncTableResultScanner::OnComplete() {
+ std::unique_lock<std::mutex> mlock(mutex_);
+ closed_ = true;
+ cond_.notify_all();
+}
+} // namespace hbase
diff --git a/hbase-native-client/core/async-table-result-scanner.h b/hbase-native-client/core/async-table-result-scanner.h
new file mode 100644
index 0000000..dcdf871
--- /dev/null
+++ b/hbase-native-client/core/async-table-result-scanner.h
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Conv.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Logging.h>
+#include <chrono>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "core/raw-scan-result-consumer.h"
+#include "core/result-scanner.h"
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer {
+ public:
+ explicit AsyncTableResultScanner(int64_t max_cache_size);
+
+ virtual ~AsyncTableResultScanner();
+
+ void Close() override;
+
+ std::shared_ptr<Result> Next() override;
+
+ void OnNext(const std::vector<std::shared_ptr<Result>> &results,
+ std::shared_ptr<ScanController> controller) override;
+
+ /**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within the scope of onHeartbeat method. You can only call its
+ * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+ void OnHeartbeat(std::shared_ptr<ScanController> controller) override;
+
+ /**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+ void OnError(const folly::exception_wrapper &error) override;
+
+ /**
+ * Indicate that the scan operation is completed normally.
+ */
+ void OnComplete() override;
+
+ // For testing
+ uint32_t num_prefetch_stopped() { return num_prefetch_stopped_; }
+
+ private:
+ void AddToCache(const std::vector<std::shared_ptr<Result>> &results);
+
+ template <typename T>
+ inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr<T> t);
+
+ void StopPrefetch(std::shared_ptr<ScanController> controller);
+
+ private:
+ std::queue<std::shared_ptr<Result>> queue_;
+ std::mutex mutex_;
+ std::condition_variable cond_;
+ folly::exception_wrapper error_;
+ int64_t cache_size_;
+ int64_t max_cache_size_;
+ bool closed_;
+ std::shared_ptr<ScanResumer> resumer_ = nullptr;
+ uint32_t num_prefetch_stopped_ = 0;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/cell-test.cc b/hbase-native-client/core/cell-test.cc
index efb835d..4611473 100644
--- a/hbase-native-client/core/cell-test.cc
+++ b/hbase-native-client/core/cell-test.cc
@@ -169,3 +169,27 @@ TEST(CellTest, CellDebugString) {
LOG(INFO) << cell2.DebugString();
EXPECT_EQ("row/column/42/DELETE/vlen=5/seqid=0", cell2.DebugString());
}
+
+TEST(CellTest, CellEstimatedSize) {
+ CellType cell_type = CellType::PUT;
+ int64_t timestamp = std::numeric_limits<int64_t>::max();
+
+ Cell empty{"a", "a", "", timestamp, "", cell_type};
+ Cell cell1{"aa", "a", "", timestamp, "", cell_type};
+ Cell cell2{"a", "aa", "", timestamp, "", cell_type};
+ Cell cell3{"a", "a", "a", timestamp, "", cell_type};
+ Cell cell4{"a", "a", "", timestamp, "a", cell_type};
+ Cell cell5{"a", "a", "", timestamp, "a", CellType::DELETE};
+ Cell cell6{"aaaaaa", "a", "", timestamp, "a", cell_type};
+
+ LOG(INFO) << empty.EstimatedSize();
+ LOG(INFO) << cell1.EstimatedSize();
+
+ EXPECT_TRUE(empty.EstimatedSize() > sizeof(Cell));
+ EXPECT_TRUE(cell1.EstimatedSize() > empty.EstimatedSize());
+ EXPECT_EQ(cell1.EstimatedSize(), cell2.EstimatedSize());
+ EXPECT_EQ(cell2.EstimatedSize(), cell3.EstimatedSize());
+ EXPECT_EQ(cell3.EstimatedSize(), cell4.EstimatedSize());
+ EXPECT_EQ(cell4.EstimatedSize(), cell5.EstimatedSize());
+ EXPECT_TRUE(cell6.EstimatedSize() > cell1.EstimatedSize());
+}
diff --git a/hbase-native-client/core/cell.cc b/hbase-native-client/core/cell.cc
index 24788ab..e475d49 100644
--- a/hbase-native-client/core/cell.cc
+++ b/hbase-native-client/core/cell.cc
@@ -94,21 +94,30 @@ std::string Cell::DebugString() const {
const char *Cell::TypeToString(CellType type) {
switch (type) {
- case MINIMUM:
+ case CellType::MINIMUM:
return "MINIMUM";
- case PUT:
+ case CellType::PUT:
return "PUT";
- case DELETE:
+ case CellType::DELETE:
return "DELETE";
- case DELETE_COLUMN:
+ case CellType::DELETE_COLUMN:
return "DELETE_COLUMN";
- case DELETE_FAMILY:
+ case CellType::DELETE_FAMILY:
return "DELETE_FAMILY";
- case MAXIMUM:
+ case CellType::MAXIMUM:
return "MAXIMUM";
default:
return "UNKNOWN";
}
}
+size_t Cell::EstimatedSize() const {
+ size_t s = sizeof(Cell);
+ s += row_.capacity();
+ s += family_.capacity();
+ s += qualifier_.capacity();
+ s += value_.capacity();
+ return s;
+}
+
} /* namespace hbase */
diff --git a/hbase-native-client/core/cell.h b/hbase-native-client/core/cell.h
index acedd96..7a62a9b 100644
--- a/hbase-native-client/core/cell.h
+++ b/hbase-native-client/core/cell.h
@@ -24,7 +24,7 @@
namespace hbase {
-enum CellType {
+enum class CellType {
MINIMUM = 0,
PUT = 4,
DELETE = 8,
@@ -49,6 +49,9 @@ class Cell {
CellType Type() const;
int64_t SequenceId() const;
std::string DebugString() const;
+ /** Returns estimated size of the Cell object including deep heap space usage
+ * of its data. Notice that this is a very rough estimate. */
+ size_t EstimatedSize() const;
private:
std::string row_;
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index f3c5150..743e928 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -161,23 +161,23 @@ TEST_F(ClientTest, PutGetDelete) {
table->Delete(hbase::Delete{row}.AddColumn("d", "1"));
result = table->Get(get);
ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
- ASSERT_TRUE(result->Value("d", "1") == nullptr) << "Column 1 should be gone";
+ ASSERT_FALSE(result->Value("d", "1")) << "Column 1 should be gone";
EXPECT_EQ(valExtra, *(result->Value("d", "extra")));
// delete cell from column "extra" with timestamp tsExtra
table->Delete(hbase::Delete{row}.AddColumn("d", "extra", tsExtra));
result = table->Get(get);
ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
- ASSERT_TRUE(result->Value("d", "1") == nullptr) << "Column 1 should be gone";
- ASSERT_TRUE(result->Value("d", "extra") != nullptr) << "Column extra should have value";
+ ASSERT_FALSE(result->Value("d", "1")) << "Column 1 should be gone";
+ ASSERT_TRUE(result->Value("d", "extra")) << "Column extra should have value";
EXPECT_EQ(valExt, *(result->Value("d", "ext"))) << "Column ext should have value";
// delete all cells from "extra" column
table->Delete(hbase::Delete{row}.AddColumns("d", "extra"));
result = table->Get(get);
ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
- ASSERT_TRUE(result->Value("d", "1") == nullptr) << "Column 1 should be gone";
- ASSERT_TRUE(result->Value("d", "extra") == nullptr) << "Column extra should be gone";
+ ASSERT_FALSE(result->Value("d", "1")) << "Column 1 should be gone";
+ ASSERT_FALSE(result->Value("d", "extra")) << "Column extra should be gone";
EXPECT_EQ(valExt, *(result->Value("d", "ext"))) << "Column ext should have value";
// Delete the row and verify that subsequent Get returns nothing
@@ -249,6 +249,7 @@ TEST_F(ClientTest, PutGet) {
table->Close();
client.Close();
}
+
TEST_F(ClientTest, GetForNonExistentTable) {
// Create TableName and Row to be fetched from HBase
auto tn = folly::to<hbase::pb::TableName>("t_not_exists");
@@ -377,7 +378,7 @@ TEST_F(ClientTest, MultiGets) {
ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
<< " must not be empty";
EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
- EXPECT_EQ("value" + std::to_string(i), *results[i]->Value("d", std::to_string(i)).get());
+ EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
}
// We are inserting test2 twice so the below test should pass
ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must not be empty";
diff --git a/hbase-native-client/core/delete-test.cc b/hbase-native-client/core/delete-test.cc
index 19e844b..ec1e3a9 100644
--- a/hbase-native-client/core/delete-test.cc
+++ b/hbase-native-client/core/delete-test.cc
@@ -19,8 +19,8 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "core/mutation.h"
#include "core/delete.h"
+#include "core/mutation.h"
#include "utils/time-util.h"
using hbase::Delete;
diff --git a/hbase-native-client/core/delete.cc b/hbase-native-client/core/delete.cc
index b44971f..5f48782 100644
--- a/hbase-native-client/core/delete.cc
+++ b/hbase-native-client/core/delete.cc
@@ -47,14 +47,14 @@ Delete& Delete::AddColumn(const std::string& family, const std::string& qualifie
* @param timestamp version timestamp
*/
Delete& Delete::AddColumn(const std::string& family, const std::string& qualifier,
- int64_t timestamp) {
+ int64_t timestamp) {
if (timestamp < 0) {
throw std::runtime_error("Timestamp cannot be negative. ts=" +
folly::to<std::string>(timestamp));
}
- return Add(std::make_unique<Cell>(row_, family, qualifier, timestamp, "",
- hbase::CellType::DELETE));
+ return Add(
+ std::make_unique<Cell>(row_, family, qualifier, timestamp, "", hbase::CellType::DELETE));
}
/**
* Delete all versions of the specified column.
@@ -62,7 +62,7 @@ Delete& Delete::AddColumn(const std::string& family, const std::string& qualifie
* @param qualifier column qualifier
*/
Delete& Delete::AddColumns(const std::string& family, const std::string& qualifier) {
- return AddColumns(family, qualifier, timestamp_);
+ return AddColumns(family, qualifier, timestamp_);
}
/**
* Delete all versions of the specified column with a timestamp less than
@@ -72,14 +72,14 @@ Delete& Delete::AddColumns(const std::string& family, const std::string& qualifi
* @param timestamp maximum version timestamp
*/
Delete& Delete::AddColumns(const std::string& family, const std::string& qualifier,
- int64_t timestamp) {
- if (timestamp < 0) {
- throw std::runtime_error("Timestamp cannot be negative. ts=" +
- folly::to<std::string>(timestamp));
- }
+ int64_t timestamp) {
+ if (timestamp < 0) {
+ throw std::runtime_error("Timestamp cannot be negative. ts=" +
+ folly::to<std::string>(timestamp));
+ }
- return Add(std::make_unique<Cell>(row_, family, qualifier, timestamp, "",
- hbase::CellType::DELETE_COLUMN));
+ return Add(std::make_unique<Cell>(row_, family, qualifier, timestamp, "",
+ hbase::CellType::DELETE_COLUMN));
}
/**
* Delete all versions of all columns of the specified family.
@@ -88,9 +88,7 @@ Delete& Delete::AddColumns(const std::string& family, const std::string& qualifi
* specified family.
* @param family family name
*/
-Delete& Delete::AddFamily(const std::string& family) {
- return AddFamily(family, timestamp_);
-}
+Delete& Delete::AddFamily(const std::string& family) { return AddFamily(family, timestamp_); }
/**
* Delete all columns of the specified family with a timestamp less than
@@ -102,14 +100,14 @@ Delete& Delete::AddFamily(const std::string& family) {
* @param timestamp maximum version timestamp
*/
Delete& Delete::AddFamily(const std::string& family, int64_t timestamp) {
- const auto &it = family_map_.find(family);
- if (family_map_.end() != it) {
- it->second.clear();
- } else {
- family_map_[family];
- }
- return Add(std::make_unique<Cell>(row_, family, "", timestamp, "",
- hbase::CellType::DELETE_FAMILY));
+ const auto& it = family_map_.find(family);
+ if (family_map_.end() != it) {
+ it->second.clear();
+ } else {
+ family_map_[family];
+ }
+ return Add(
+ std::make_unique<Cell>(row_, family, "", timestamp, "", hbase::CellType::DELETE_FAMILY));
}
/**
* Delete all columns of the specified family with a timestamp equal to
@@ -118,8 +116,8 @@ Delete& Delete::AddFamily(const std::string& family, int64_t timestamp) {
* @param timestamp version timestamp
*/
Delete& Delete::AddFamilyVersion(const std::string& family, int64_t timestamp) {
- return Add(std::make_unique<Cell>(row_, family, "", timestamp, "",
- hbase::CellType::DELETE_FAMILY_VERSION));
+ return Add(std::make_unique<Cell>(row_, family, "", timestamp, "",
+ hbase::CellType::DELETE_FAMILY_VERSION));
}
Delete& Delete::Add(std::unique_ptr<Cell> cell) {
if (cell->Row() != row_) {
diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc
index 52b2ec5..bc0d446 100644
--- a/hbase-native-client/core/get.cc
+++ b/hbase-native-client/core/get.cc
@@ -28,7 +28,7 @@ Get::~Get() {}
Get::Get(const std::string &row) : Row(row) {}
-Get::Get(const Get &get) {
+Get::Get(const Get &get) : Query(get) {
row_ = get.row_;
max_versions_ = get.max_versions_;
cache_blocks_ = get.cache_blocks_;
@@ -39,6 +39,7 @@ Get::Get(const Get &get) {
}
Get &Get::operator=(const Get &get) {
+ Query::operator=(get);
row_ = get.row_;
max_versions_ = get.max_versions_;
cache_blocks_ = get.cache_blocks_;
diff --git a/hbase-native-client/core/hbase-configuration-loader.h b/hbase-native-client/core/hbase-configuration-loader.h
index a1c1d3f..95b2541 100644
--- a/hbase-native-client/core/hbase-configuration-loader.h
+++ b/hbase-native-client/core/hbase-configuration-loader.h
@@ -24,15 +24,12 @@
#include <vector>
#include <boost/optional.hpp>
-#include <experimental/optional>
#include "core/configuration.h"
+#include "utils/optional.h"
namespace hbase {
-template <class T>
-using optional = std::experimental::optional<T>;
-
class HBaseConfigurationLoader {
public:
HBaseConfigurationLoader();
diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h
index 9bb89b9..33f552b 100644
--- a/hbase-native-client/core/hbase-rpc-controller.h
+++ b/hbase-native-client/core/hbase-rpc-controller.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <folly/ExceptionWrapper.h>
#include <google/protobuf/service.h>
#include <chrono>
#include <string>
@@ -37,6 +38,10 @@ class HBaseRpcController : public google::protobuf::RpcController {
bool Failed() const override { return false; }
+ folly::exception_wrapper exception() { return exception_; }
+
+ void set_exception(const folly::exception_wrapper& exception) { exception_ = exception; }
+
std::string ErrorText() const override { return ""; }
void StartCancel() override {}
@@ -46,6 +51,9 @@ class HBaseRpcController : public google::protobuf::RpcController {
bool IsCanceled() const override { return false; }
void NotifyOnCancel(google::protobuf::Closure* callback) override {}
+
+ private:
+ folly::exception_wrapper exception_;
};
} /* namespace hbase */
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 0577bfc..8efecc8 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -92,19 +92,12 @@ std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) {
throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" +
std::to_string(results.size()));
}
-
auto result = *results[0];
- // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO
-
- std::shared_ptr<std::string> region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
- std::shared_ptr<std::string> server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
- if (region_info_str == nullptr) {
- throw std::runtime_error("regioninfo column null for location");
- }
- if (server_str == nullptr) {
- throw std::runtime_error("server column null for location");
- }
+ auto region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
+ auto server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
+ CHECK(region_info_str);
+ CHECK(server_str);
auto row = result.Row();
auto region_info = folly::to<RegionInfo>(*region_info_str);
diff --git a/hbase-native-client/core/query.h b/hbase-native-client/core/query.h
index b706303..301f448 100644
--- a/hbase-native-client/core/query.h
+++ b/hbase-native-client/core/query.h
@@ -19,6 +19,8 @@
#pragma once
+#include <memory>
+
#include "core/filter.h"
namespace hbase {
@@ -28,14 +30,25 @@ namespace hbase {
*/
class Query {
public:
+ Query() = default;
+ Query(const Query &query) {
+ // filter can be a custom subclass of Filter, so we do not do a deep copy here.
+ filter_ = query.filter_;
+ }
+
+ Query &operator=(const Query &query) {
+ filter_ = query.filter_;
+ return *this;
+ }
+
virtual ~Query() {}
- void SetFilter(std::unique_ptr<Filter> filter) { filter_ = std::move(filter); }
+ void SetFilter(std::shared_ptr<Filter> filter) { filter_ = filter; }
- const std::unique_ptr<Filter>& filter() const { return filter_; }
+ const std::shared_ptr<Filter> filter() const { return filter_; }
protected:
- std::unique_ptr<Filter> filter_ = nullptr;
+ std::shared_ptr<Filter> filter_ = nullptr;
};
} // namespace hbase
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index f71fbba..998e2f1 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -111,9 +111,10 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) {
folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
auto caller =
CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout())
- ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc,
- std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
+ ->action([=, &del](
+ std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>(
rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest,
[](const Response& r) -> folly::Unit { return folly::unit; });
@@ -143,4 +144,24 @@ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::B
return caller->Call().then([caller](auto r) { return r; });
}
+
+void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) {
+ auto scanner = AsyncClientScanner::Create(
+ connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(),
+ connection_conf_->max_retries(), connection_conf_->scan_timeout(),
+ connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count());
+ scanner->Start();
+}
+
+std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) {
+ // always create a new scan object as we may reset the start row later.
+ auto new_scan = std::make_shared<hbase::Scan>(scan);
+ if (new_scan->Caching() <= 0) {
+ new_scan->SetCaching(default_scanner_caching_);
+ }
+ if (new_scan->MaxResultSize() <= 0) {
+ new_scan->SetMaxResultSize(default_scanner_max_result_size_);
+ }
+ return new_scan;
+}
} // namespace hbase
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index c8e9f2f..8c40dae 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -24,7 +24,9 @@
#include <memory>
#include <string>
#include <vector>
+
#include "core/async-batch-rpc-retrying-caller.h"
+#include "core/async-client-scanner.h"
#include "core/async-connection.h"
#include "core/async-rpc-retrying-caller-factory.h"
#include "core/async-rpc-retrying-caller.h"
@@ -34,6 +36,7 @@
#include "core/increment.h"
#include "core/put.h"
#include "core/result.h"
+#include "core/scan.h"
namespace hbase {
@@ -48,14 +51,22 @@ class RawAsyncTable {
: connection_(connection),
connection_conf_(connection->connection_conf()),
table_name_(table_name),
- rpc_client_(connection->rpc_client()) {}
+ rpc_client_(connection->rpc_client()) {
+ default_scanner_caching_ = connection_conf_->scanner_caching();
+ default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size();
+ }
virtual ~RawAsyncTable() = default;
folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get);
- folly::Future<folly::Unit> Delete(const hbase::Delete& del);
- folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+ folly::Future<folly::Unit> Delete(const hbase::Delete& del);
+
+ folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+
folly::Future<folly::Unit> Put(const hbase::Put& put);
+
+ void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer);
+
void Close() {}
folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get(
@@ -69,6 +80,8 @@ class RawAsyncTable {
std::shared_ptr<ConnectionConfiguration> connection_conf_;
std::shared_ptr<pb::TableName> table_name_;
std::shared_ptr<RpcClient> rpc_client_;
+ int32_t default_scanner_caching_;
+ int64_t default_scanner_max_result_size_;
/* Methods */
template <typename REQ, typename PREQ, typename PRESP, typename RESP>
@@ -81,5 +94,7 @@ class RawAsyncTable {
template <typename RESP>
std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(
std::string row, std::chrono::nanoseconds rpc_timeout);
+
+ std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan);
};
} // namespace hbase
diff --git a/hbase-native-client/core/raw-scan-result-consumer.h b/hbase-native-client/core/raw-scan-result-consumer.h
new file mode 100644
index 0000000..b7c3c48
--- /dev/null
+++ b/hbase-native-client/core/raw-scan-result-consumer.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Logging.h>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed };
+
+enum class ScanResumerState { kInitialized, kSuspended, kResumed };
+
+/**
+ * Used to resume a scan.
+ */
+class ScanResumer {
+ public:
+ virtual ~ScanResumer() = default;
+
+ /**
+ * Resume the scan. You are free to call it multiple time but only the first call will take
+ * effect.
+ */
+ virtual void Resume() = 0;
+};
+
+/**
+ * Used to suspend or stop a scan.
+ * <p>
+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
+ * IllegalStateException will be thrown if you call them at other places.
+ * <p>
+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are
+ * free to not call them both), and the methods are not reentrant. A IllegalStateException will be
+ * thrown if you have already called one of the methods.
+ */
+class ScanController {
+ public:
+ virtual ~ScanController() = default;
+
+ /**
+ * Suspend the scan.
+ * <p>
+ * This means we will stop fetching data in background, i.e., will not call onNext any more
+ * before you resume the scan.
+ * @return A resumer used to resume the scan later.
+ */
+ virtual std::shared_ptr<ScanResumer> Suspend() = 0;
+
+ /**
+ * Terminate the scan.
+ * <p>
+ * This is useful when you have got enough results and want to stop the scan in onNext method,
+ * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+ */
+ virtual void Terminate() = 0;
+};
+
+/**
+ * Receives {@link Result} for an asynchronous scan.
+ * <p>
+ * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
+ * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
+ * HBase in background while you process the returned data, you need to move the processing work to
+ * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
+ * consuming tasks in all methods below unless you know what you are doing.
+ */
+class RawScanResultConsumer {
+ public:
+ virtual ~RawScanResultConsumer() = default;
+
+ /**
+ * Indicate that we have receive some data.
+ * @param results the data fetched from HBase service.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within scope of onNext method. You can only call its method in
+ * onNext, do NOT store it and call it later outside onNext.
+ */
+ virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results,
+ std::shared_ptr<ScanController> controller) {}
+
+ /**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within the scope of onHeartbeat method. You can only call its
+ * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+ virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {}
+
+ /**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+ virtual void OnError(const folly::exception_wrapper &error) {}
+
+ /**
+ * Indicate that the scan operation is completed normally.
+ */
+ virtual void OnComplete() {}
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index 4087d94..d5d9d67 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -26,7 +26,7 @@
namespace hbase {
-enum RegionLocateType { kBefore, kCurrent, kAfter };
+enum class RegionLocateType { kBefore, kCurrent, kAfter };
/**
* @brief class to hold where a region is located.
diff --git a/hbase-native-client/core/request-converter-test.cc b/hbase-native-client/core/request-converter-test.cc
index c01f16c..6c07a19 100644
--- a/hbase-native-client/core/request-converter-test.cc
+++ b/hbase-native-client/core/request-converter-test.cc
@@ -83,7 +83,6 @@ TEST(RequestConverter, ToScan) {
scan.SetReversed(true);
scan.SetStartRow(start_row);
scan.SetStopRow(stop_row);
- scan.SetSmall(true);
scan.SetCaching(3);
scan.SetConsistency(hbase::pb::Consistency::TIMELINE);
scan.SetCacheBlocks(true);
@@ -105,7 +104,7 @@ TEST(RequestConverter, ToScan) {
EXPECT_TRUE(msg->scan().reversed());
EXPECT_EQ(msg->scan().start_row(), start_row);
EXPECT_EQ(msg->scan().stop_row(), stop_row);
- EXPECT_TRUE(msg->scan().small());
+ EXPECT_FALSE(msg->scan().small());
EXPECT_EQ(msg->scan().caching(), 3);
EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE);
EXPECT_TRUE(msg->scan().cache_blocks());
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index a1e63fe..6eb2f04 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -53,19 +53,11 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
return pb_req;
}
-std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
- const std::string ®ion_name) {
- auto pb_req = Request::scan();
-
- auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
-
- RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
-
- auto pb_scan = pb_msg->mutable_scan();
+std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) {
+ auto pb_scan = std::make_unique<hbase::pb::Scan>();
pb_scan->set_max_versions(scan.MaxVersions());
pb_scan->set_cache_blocks(scan.CacheBlocks());
pb_scan->set_reversed(scan.IsReversed());
- pb_scan->set_small(scan.IsSmall());
pb_scan->set_caching(scan.Caching());
pb_scan->set_start_row(scan.StartRow());
pb_scan->set_stop_row(scan.StopRow());
@@ -94,12 +86,78 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release());
}
- // TODO We will change this later.
+ return std::move(pb_scan);
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+ const std::string ®ion_name) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+ pb_msg->set_allocated_scan(ToScan(scan).release());
+
+ SetCommonScanRequestFields(pb_msg, false);
+
+ return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+ const std::string ®ion_name,
+ int32_t num_rows, bool close_scanner) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+ pb_msg->set_allocated_scan(ToScan(scan).release());
+
+ pb_msg->set_number_of_rows(num_rows);
+ pb_msg->set_close_scanner(close_scanner);
+
+ SetCommonScanRequestFields(pb_msg, false);
+
+ return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ pb_msg->set_number_of_rows(num_rows);
+ pb_msg->set_close_scanner(close_scanner);
+ pb_msg->set_scanner_id(scanner_id);
+
+ SetCommonScanRequestFields(pb_msg, false);
+
+ return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner,
+ int64_t next_call_seq_id, bool renew) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ pb_msg->set_number_of_rows(num_rows);
+ pb_msg->set_close_scanner(close_scanner);
+ pb_msg->set_scanner_id(scanner_id);
+ pb_msg->set_next_call_seq(next_call_seq_id);
+
+ SetCommonScanRequestFields(pb_msg, renew);
+ return pb_req;
+}
+
+void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg,
+ bool renew) {
+ // TODO We will change these later when we implement partial results and heartbeats, etc
pb_msg->set_client_handles_partials(false);
pb_msg->set_client_handles_heartbeats(false);
pb_msg->set_track_scan_metrics(false);
-
- return pb_req;
+ pb_msg->set_renew(renew);
+ // TODO: set scan limit
}
std::unique_ptr<Request> RequestConverter::ToMultiRequest(
@@ -123,7 +181,6 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
}
}
- VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString();
return pb_req;
}
@@ -190,13 +247,13 @@ std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType t
DeleteType RequestConverter::ToDeleteType(const CellType type) {
switch (type) {
- case DELETE:
+ case CellType::DELETE:
return pb::MutationProto_DeleteType_DELETE_ONE_VERSION;
- case DELETE_COLUMN:
+ case CellType::DELETE_COLUMN:
return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS;
- case DELETE_FAMILY:
+ case CellType::DELETE_FAMILY:
return pb::MutationProto_DeleteType_DELETE_FAMILY;
- case DELETE_FAMILY_VERSION:
+ case CellType::DELETE_FAMILY_VERSION:
return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION;
default:
throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type));
@@ -216,12 +273,11 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put,
pb_msg->set_allocated_mutation(
ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release());
- VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
return pb_req;
}
std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del,
- const std::string ®ion_name) {
+ const std::string ®ion_name) {
auto pb_req = Request::mutate();
auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index 6d57161..c807f45 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -66,9 +66,20 @@ class RequestConverter {
*/
static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name);
+ static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name,
+ int32_t num_rows, bool close_scanner);
+
+ static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner);
+
+ static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner, int64_t next_call_seq_id,
+ bool renew);
+
static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests);
- static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,const std::string ®ion_name);
+ static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,
+ const std::string ®ion_name);
static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string ®ion_name);
@@ -91,8 +102,10 @@ class RequestConverter {
*/
static void SetRegion(const std::string ®ion_name, pb::RegionSpecifier *region_specifier);
static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get);
+ static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan);
static DeleteType ToDeleteType(const CellType type);
static bool IsDelete(const CellType type);
+ static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew);
};
} /* namespace hbase */
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 94b7875..d2d719b 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -54,7 +54,7 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& re
}
std::shared_ptr<Result> ResponseConverter::ToResult(
- const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
+ const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) {
std::vector<std::shared_ptr<Cell>> vcells;
for (auto cell : result.cell()) {
std::shared_ptr<Cell> pcell =
@@ -82,34 +82,38 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
- VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
- int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
- : scan_resp->results_size();
+ return FromScanResponse(scan_resp, resp.cell_scanner());
+}
+
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
+ const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) {
+ VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
+ << " cell_scanner:" << (cell_scanner == nullptr);
+ int num_results =
+ cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();
std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)};
for (int i = 0; i < num_results; i++) {
- if (resp.cell_scanner() != nullptr) {
+ if (cell_scanner != nullptr) {
// Cells are out in cellblocks. Group them up again as Results. How many to read at a
// time will be found in getCellsLength -- length here is how many Cells in the i'th Result
int num_cells = scan_resp->cells_per_result(i);
std::vector<std::shared_ptr<Cell>> vcells;
- while (resp.cell_scanner()->Advance()) {
- vcells.push_back(resp.cell_scanner()->Current());
- }
- // TODO: check associated cell count?
-
- if (vcells.size() != num_cells) {
- std::string msg = "Results sent from server=" + std::to_string(num_results) +
- ". But only got " + std::to_string(i) +
- " results completely at client. Resetting the scanner to scan again.";
- LOG(ERROR) << msg;
- throw std::runtime_error(msg);
+ for (int j = 0; j < num_cells; j++) {
+ if (!cell_scanner->Advance()) {
+ std::string msg = "Results sent from server=" + std::to_string(num_results) +
+ ". But only got " + std::to_string(i) +
+ " results completely at client. Resetting the scanner to scan again.";
+ LOG(ERROR) << msg;
+ throw std::runtime_error(msg);
+ }
+ vcells.push_back(cell_scanner->Current());
}
// TODO: handle partial results per Result by checking partial_flag_per_result
results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false);
} else {
- results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
+ results[i] = ToResult(scan_resp->results(i), cell_scanner);
}
}
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 0fdde89..b518d1c 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -39,7 +39,7 @@ class ResponseConverter {
~ResponseConverter();
static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result,
- const std::unique_ptr<CellScanner>& cell_scanner);
+ const std::shared_ptr<CellScanner> cell_scanner);
/**
* @brief Returns a Result object created by PB Message in passed Response object.
@@ -51,6 +51,9 @@ class ResponseConverter {
static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp);
+ static std::vector<std::shared_ptr<Result>> FromScanResponse(
+ const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner);
+
static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
const Response& resp);
diff --git a/hbase-native-client/core/query.h b/hbase-native-client/core/result-scanner.h
similarity index 66%
copy from hbase-native-client/core/query.h
copy to hbase-native-client/core/result-scanner.h
index b706303..9460521 100644
--- a/hbase-native-client/core/query.h
+++ b/hbase-native-client/core/result-scanner.h
@@ -19,23 +19,29 @@
#pragma once
-#include "core/filter.h"
+#include <functional>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
namespace hbase {
/**
- * Base class for read RPC calls (Get / Scan).
+ * Interface for client-side scanning. Use Table to obtain instances.
*/
-class Query {
- public:
- virtual ~Query() {}
+class ResultScanner {
+ // TODO: should we implement forward iterators?
- void SetFilter(std::unique_ptr<Filter> filter) { filter_ = std::move(filter); }
+ public:
+ virtual ~ResultScanner() {}
- const std::unique_ptr<Filter>& filter() const { return filter_; }
+ virtual void Close() = 0;
- protected:
- std::unique_ptr<Filter> filter_ = nullptr;
+ virtual std::shared_ptr<Result> Next() = 0;
};
-
-} // namespace hbase
+} /* namespace hbase */
diff --git a/hbase-native-client/core/result-test.cc b/hbase-native-client/core/result-test.cc
index 520f4b9..3299ffc 100644
--- a/hbase-native-client/core/result-test.cc
+++ b/hbase-native-client/core/result-test.cc
@@ -17,6 +17,7 @@
*
*/
+#include <glog/logging.h>
#include <gtest/gtest.h>
#include <limits>
#include <memory>
@@ -71,7 +72,7 @@ void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) {
}
default: {
cells.push_back(std::make_shared<Cell>(
- row, family, column, std::numeric_limits<long>::max(), value, CellType::PUT));
+ row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT));
}
}
}
@@ -255,7 +256,7 @@ TEST(Result, FilledResult) {
break;
}
default: {
- EXPECT_EQ(std::numeric_limits<long>::max(), version_map.first);
+ EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first);
EXPECT_EQ(value, version_map.second);
}
}
@@ -297,3 +298,24 @@ TEST(Result, FilledResult) {
EXPECT_EQ("value-9", qual_val_map.second);
}
}
+
+TEST(Result, ResultEstimatedSize) {
+ CellType cell_type = CellType::PUT;
+ int64_t timestamp = std::numeric_limits<int64_t>::max();
+ std::vector<std::shared_ptr<Cell> > cells;
+ Result empty(cells, true, false, false);
+
+ EXPECT_EQ(empty.EstimatedSize(), sizeof(Result));
+
+ cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+ Result result1(cells, true, false, false);
+ EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize());
+
+ cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+ Result result2(cells, true, false, false);
+ EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize());
+
+ LOG(INFO) << empty.EstimatedSize();
+ LOG(INFO) << result1.EstimatedSize();
+ LOG(INFO) << result2.EstimatedSize();
+}
diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc
index 9d9ddb3..44b4c86 100644
--- a/hbase-native-client/core/result.cc
+++ b/hbase-native-client/core/result.cc
@@ -25,13 +25,7 @@ Result::~Result() {}
Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale,
bool partial)
- : exists_(exists), stale_(stale), partial_(partial) {
- for (const auto &cell : cells) {
- cells_.push_back(cell);
- // We create the map when cells are added. unlike java where map is created
- // when result.getMap() is called
- result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
- }
+ : exists_(exists), stale_(stale), partial_(partial), cells_(cells) {
row_ = (cells_.size() == 0 ? "" : cells_[0]->Row());
}
@@ -43,10 +37,10 @@ Result::Result(const Result &result) {
if (!result.cells_.empty()) {
for (const auto &cell : result.cells_) {
cells_.push_back(cell);
- result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
}
}
}
+
const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; }
std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family,
@@ -74,13 +68,12 @@ const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family,
return nullptr;
}
-std::shared_ptr<std::string> Result::Value(const std::string &family,
- const std::string &qualifier) const {
+optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const {
std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier));
if (latest_cell.get()) {
- return std::make_shared<std::string>(latest_cell->Value());
+ return optional<std::string>(latest_cell->Value());
}
- return nullptr;
+ return optional<std::string>();
}
bool Result::IsEmpty() const { return cells_.empty(); }
@@ -89,24 +82,33 @@ const std::string &Result::Row() const { return row_; }
int Result::Size() const { return cells_.size(); }
-const ResultMap &Result::Map() const { return result_map_; }
+ResultMap Result::Map() const {
+ ResultMap result_map;
+ for (const auto &cell : cells_) {
+ result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
+ }
+ return result_map;
+}
-const std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
+std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
std::map<std::string, std::string> family_map;
if (!IsEmpty()) {
- for (auto itr = result_map_.begin(); itr != result_map_.end(); ++itr) {
- if (family == itr->first) {
- for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
- for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
- // We break after inserting the first value. Result.java takes only
- // the first value
- family_map[qitr->first] = vitr->second;
- break;
- }
- }
+ auto result_map = Map();
+ auto itr = result_map.find(family);
+ if (itr == result_map.end()) {
+ return family_map;
+ }
+
+ for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
+ for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
+ // We break after inserting the first value. Result.java takes only
+ // the first value
+ family_map[qitr->first] = vitr->second;
+ break;
}
}
}
+
return family_map;
}
@@ -131,4 +133,14 @@ std::string Result::DebugString() const {
return ret;
}
+size_t Result::EstimatedSize() const {
+ size_t s = sizeof(Result);
+ s += row_.capacity();
+ for (const auto c : cells_) {
+ s += sizeof(std::shared_ptr<Cell>);
+ s + c->EstimatedSize();
+ }
+ return s;
+}
+
} /* namespace hbase */
diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h
index 627d161..f18071b 100644
--- a/hbase-native-client/core/result.h
+++ b/hbase-native-client/core/result.h
@@ -26,6 +26,7 @@
#include <vector>
#include "core/cell.h"
+#include "utils/optional.h"
namespace hbase {
@@ -79,7 +80,7 @@ class Result {
* @param family - column family
* @param qualifier - column qualifier
*/
- std::shared_ptr<std::string> Value(const std::string &family, const std::string &qualifier) const;
+ optional<std::string> Value(const std::string &family, const std::string &qualifier) const;
/**
* @brief Returns if the underlying Cell vector is empty or not
@@ -104,23 +105,32 @@ class Result {
* All other map returning methods make use of this map internally
* The Map is created when the Result instance is created
*/
- const ResultMap &Map() const;
+ ResultMap Map() const;
/**
* @brief Map of qualifiers to values.
* Returns a Map of the form: Map<qualifier,value>
* @param family - column family to get
*/
- const std::map<std::string, std::string> FamilyMap(const std::string &family) const;
+ std::map<std::string, std::string> FamilyMap(const std::string &family) const;
std::string DebugString() const;
+ bool Exists() const { return exists_; }
+
+ bool Stale() const { return stale_; }
+
+ bool Partial() const { return partial_; }
+
+ /** Returns estimated size of the Result object including deep heap space usage
+ * of its Cells and data. Notice that this is a very rough estimate. */
+ size_t EstimatedSize() const;
+
private:
bool exists_ = false;
bool stale_ = false;
bool partial_ = false;
std::string row_ = "";
std::vector<std::shared_ptr<Cell> > cells_;
- ResultMap result_map_;
};
} /* namespace hbase */
diff --git a/hbase-native-client/core/scan-result-cache-test.cc b/hbase-native-client/core/scan-result-cache-test.cc
new file mode 100644
index 0000000..0bf83ce
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache-test.cc
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+
+using hbase::ScanResultCache;
+using hbase::Result;
+using hbase::Cell;
+using hbase::CellType;
+
+using ResultVector = std::vector<std::shared_ptr<Result>>;
+
+std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family,
+ const std::string &column) {
+ auto row = folly::to<std::string>(key);
+ return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row,
+ CellType::PUT);
+}
+
+std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) {
+ return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial);
+}
+
+TEST(ScanResultCacheTest, NoPartial) {
+ ScanResultCache cache;
+ ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false));
+ ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true));
+ int32_t count = 10;
+ ResultVector results{};
+ for (int32_t i = 0; i < count; i++) {
+ results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false));
+ }
+ ASSERT_EQ(results, cache.AddAndGet(results, false));
+}
+
+TEST(ScanResultCacheTest, Combine1) {
+ ScanResultCache cache;
+ auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true);
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+ auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+ auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false);
+ ASSERT_EQ(1L, results.size());
+ ASSERT_EQ(prev_result, results[0]);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size());
+
+ results = cache.AddAndGet(ResultVector{}, false);
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(3, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+}
+
+TEST(ScanResultCacheTest, Combine2) {
+ ScanResultCache cache;
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+ auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+
+ auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+ auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+
+ auto results = cache.AddAndGet(ResultVector{next_result1}, false);
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(3, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+
+ results = cache.AddAndGet(ResultVector{next_to_next_result1}, false);
+ ASSERT_EQ(2, results.size());
+ ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(1, results[0]->Cells().size());
+ ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row()));
+ ASSERT_EQ(1, results[1]->Cells().size());
+ ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, Combine3) {
+ ScanResultCache cache;
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+ auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false);
+ auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+
+ auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false);
+
+ ASSERT_EQ(2, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(2, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+ ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row()));
+ ASSERT_EQ(1, results[1]->Cells().size());
+ ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1")));
+
+ results = cache.AddAndGet(ResultVector{}, false);
+
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(1, results[0]->Cells().size());
+ ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+}
+
+TEST(ScanResultCacheTest, Combine4) {
+ ScanResultCache cache;
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false);
+ auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+ auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+
+ auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false);
+
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(2, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+
+ results = cache.AddAndGet(ResultVector{next_result2}, false);
+
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(2, results[0]->Cells().size());
+ ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, SizeOf) {
+ std::string e{""};
+ std::string f{"f"};
+ std::string foo{"foo"};
+
+ LOG(INFO) << sizeof(e) << " " << e.capacity();
+ LOG(INFO) << sizeof(f) << " " << f.capacity();
+ LOG(INFO) << sizeof(foo) << " " << foo.capacity();
+}
diff --git a/hbase-native-client/core/scan-result-cache.cc b/hbase-native-client/core/scan-result-cache.cc
new file mode 100644
index 0000000..62a51e0
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/scan-result-cache.h"
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+/**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet(
+ const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) {
+ // If no results were returned it indicates that either we have the all the partial results
+ // necessary to construct the complete result or the server had to send a heartbeat message
+ // to the client to keep the client-server connection alive
+ if (results.empty()) {
+ // If this response was an empty heartbeat message, then we have not exhausted the region
+ // and thus there may be more partials server side that still need to be added to the partial
+ // list before we form the complete Result
+ if (!partial_results_.empty() && !is_hearthbeat) {
+ return UpdateNumberOfCompleteResultsAndReturn(
+ std::vector<std::shared_ptr<Result>>{Combine()});
+ }
+ return std::vector<std::shared_ptr<Result>>{};
+ }
+ // In every RPC response there should be at most a single partial result. Furthermore, if
+ // there is a partial result, it is guaranteed to be in the last position of the array.
+ auto last = results[results.size() - 1];
+ if (last->Partial()) {
+ if (partial_results_.empty()) {
+ partial_results_.push_back(last);
+ std::vector<std::shared_ptr<Result>> new_results;
+ std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results));
+ return UpdateNumberOfCompleteResultsAndReturn(new_results);
+ }
+ // We have only one result and it is partial
+ if (results.size() == 1) {
+ // check if there is a row change
+ if (partial_results_.at(0)->Row() == last->Row()) {
+ partial_results_.push_back(last);
+ return std::vector<std::shared_ptr<Result>>{};
+ }
+ auto complete_result = Combine();
+ partial_results_.push_back(last);
+ return UpdateNumberOfCompleteResultsAndReturn(complete_result);
+ }
+ // We have some complete results
+ auto results_to_return = PrependCombined(results, results.size() - 1);
+ partial_results_.push_back(last);
+ return UpdateNumberOfCompleteResultsAndReturn(results_to_return);
+ }
+ if (!partial_results_.empty()) {
+ return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size()));
+ }
+ return UpdateNumberOfCompleteResultsAndReturn(results);
+}
+
+void ScanResultCache::Clear() { partial_results_.clear(); }
+
+std::shared_ptr<Result> ScanResultCache::CreateCompleteResult(
+ const std::vector<std::shared_ptr<Result>> &partial_results) {
+ if (partial_results.empty()) {
+ return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false);
+ }
+ std::vector<std::shared_ptr<Cell>> cells{};
+ bool stale = false;
+ std::string prev_row = "";
+ std::string current_row = "";
+ size_t i = 0;
+ for (const auto &r : partial_results) {
+ current_row = r->Row();
+ if (i != 0 && prev_row != current_row) {
+ throw new std::runtime_error(
+ "Cannot form complete result. Rows of partial results do not match.");
+ }
+ // Ensure that all Results except the last one are marked as partials. The last result
+ // may not be marked as a partial because Results are only marked as partials when
+ // the scan on the server side must be stopped due to reaching the maxResultSize.
+ // Visualizing it makes it easier to understand:
+ // maxResultSize: 2 cells
+ // (-x-) represents cell number x in a row
+ // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+ // How row1 will be returned by the server as partial Results:
+ // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+ // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+ // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+ if (i != partial_results.size() - 1 && !r->Partial()) {
+ throw new std::runtime_error("Cannot form complete result. Result is missing partial flag.");
+ }
+ prev_row = current_row;
+ stale = stale || r->Stale();
+ for (const auto &c : r->Cells()) {
+ cells.push_back(c);
+ }
+ i++;
+ }
+
+ return std::make_shared<Result>(cells, false, stale, false);
+}
+
+std::shared_ptr<Result> ScanResultCache::Combine() {
+ auto result = CreateCompleteResult(partial_results_);
+ partial_results_.clear();
+ return result;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined(
+ const std::vector<std::shared_ptr<Result>> &results, int length) {
+ if (length == 0) {
+ return std::vector<std::shared_ptr<Result>>{Combine()};
+ }
+ // the last part of a partial result may not be marked as partial so here we need to check if
+ // there is a row change.
+ size_t start;
+ if (partial_results_[0]->Row() == results[0]->Row()) {
+ partial_results_.push_back(results[0]);
+ start = 1;
+ length--;
+ } else {
+ start = 0;
+ }
+ std::vector<std::shared_ptr<Result>> prepend_results{};
+ prepend_results.push_back(Combine());
+ std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results));
+ return prepend_results;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+ const std::shared_ptr<Result> &result) {
+ return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result});
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+ const std::vector<std::shared_ptr<Result>> &results) {
+ num_complete_rows_ += results.size();
+ return results;
+}
+} // namespace hbase
diff --git a/hbase-native-client/core/scan-result-cache.h b/hbase-native-client/core/scan-result-cache.h
new file mode 100644
index 0000000..5d3d0ab
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Logging.h>
+#include <algorithm>
+#include <chrono>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+class ScanResultCache {
+ // In Java, there are 3 different implementations for this. We are not doing partial results,
+ // or scan batching in native code for now, so this version is simpler and
+ // only deals with giving back complete rows as Result. It is more or less implementation
+ // of CompleteScanResultCache.java
+
+ public:
+ /**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+ std::vector<std::shared_ptr<Result>> AddAndGet(
+ const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat);
+
+ void Clear();
+
+ int64_t num_complete_rows() const { return num_complete_rows_; }
+
+ private:
+ /**
+ * Forms a single result from the partial results in the partialResults list. This method is
+ * useful for reconstructing partial results on the client side.
+ * @param partial_results list of partial results
+ * @return The complete result that is formed by combining all of the partial results together
+ */
+ static std::shared_ptr<Result> CreateCompleteResult(
+ const std::vector<std::shared_ptr<Result>> &partial_results);
+
+ std::shared_ptr<Result> Combine();
+
+ std::vector<std::shared_ptr<Result>> PrependCombined(
+ const std::vector<std::shared_ptr<Result>> &results, int length);
+
+ std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+ const std::shared_ptr<Result> &result);
+
+ std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+ const std::vector<std::shared_ptr<Result>> &results);
+
+ private:
+ std::vector<std::shared_ptr<Result>> partial_results_;
+ int64_t num_complete_rows_ = 0;
+};
+} // namespace hbase
diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc
index 73fb6df..0ee054c 100644
--- a/hbase-native-client/core/scan-test.cc
+++ b/hbase-native-client/core/scan-test.cc
@@ -17,13 +17,13 @@
*
*/
-#include "core/scan.h"
-
+#include <gtest/gtest.h>
#include <limits>
-#include <gtest/gtest.h>
+#include "core/scan.h"
-using namespace hbase;
+using hbase::Get;
+using hbase::Scan;
void CheckFamilies(Scan &scan) {
EXPECT_EQ(false, scan.HasFamilies());
@@ -74,7 +74,7 @@ void CheckFamilies(Scan &scan) {
EXPECT_EQ(it, scan.FamilyMap().end());
}
-void CheckFamiliesAfterCopy(Scan &scan) {
+void CheckFamiliesAfterCopy(const Scan &scan) {
EXPECT_EQ(true, scan.HasFamilies());
EXPECT_EQ(3, scan.FamilyMap().size());
int i = 1;
@@ -116,11 +116,6 @@ void ScanMethods(Scan &scan) {
scan.SetStopRow(stop_row);
EXPECT_EQ(stop_row, scan.StopRow());
- scan.SetSmall(true);
- EXPECT_EQ(true, scan.IsSmall());
- scan.SetSmall(false);
- EXPECT_EQ(false, scan.IsSmall());
-
scan.SetCaching(3);
EXPECT_EQ(3, scan.Caching());
diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc
index 5f315ec..6dcc51b 100644
--- a/hbase-native-client/core/scan.cc
+++ b/hbase-native-client/core/scan.cc
@@ -38,7 +38,7 @@ Scan::Scan(const std::string &start_row, const std::string &stop_row)
CheckRow(stop_row_);
}
-Scan::Scan(const Scan &scan) {
+Scan::Scan(const Scan &scan) : Query(scan) {
start_row_ = scan.start_row_;
stop_row_ = scan.stop_row_;
max_versions_ = scan.max_versions_;
@@ -47,7 +47,6 @@ Scan::Scan(const Scan &scan) {
cache_blocks_ = scan.cache_blocks_;
load_column_families_on_demand_ = scan.load_column_families_on_demand_;
reversed_ = scan.reversed_;
- small_ = scan.small_;
allow_partial_results_ = scan.allow_partial_results_;
consistency_ = scan.consistency_;
tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -55,6 +54,7 @@ Scan::Scan(const Scan &scan) {
}
Scan &Scan::operator=(const Scan &scan) {
+ Query::operator=(scan);
start_row_ = scan.start_row_;
stop_row_ = scan.stop_row_;
max_versions_ = scan.max_versions_;
@@ -63,7 +63,6 @@ Scan &Scan::operator=(const Scan &scan) {
cache_blocks_ = scan.cache_blocks_;
load_column_families_on_demand_ = scan.load_column_families_on_demand_;
reversed_ = scan.reversed_;
- small_ = scan.small_;
allow_partial_results_ = scan.allow_partial_results_;
consistency_ = scan.consistency_;
tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -109,24 +108,14 @@ void Scan::SetReversed(bool reversed) { reversed_ = reversed; }
bool Scan::IsReversed() const { return reversed_; }
-void Scan::SetStartRow(const std::string &start_row) {
- CheckRow(start_row);
- start_row_ = start_row;
-}
+void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; }
const std::string &Scan::StartRow() const { return start_row_; }
-void Scan::SetStopRow(const std::string &stop_row) {
- CheckRow(stop_row);
- stop_row_ = stop_row;
-}
+void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; }
const std::string &Scan::StopRow() const { return stop_row_; }
-void Scan::SetSmall(bool small) { small_ = small; }
-
-bool Scan::IsSmall() const { return small_; }
-
void Scan::SetCaching(int caching) { caching_ = caching; }
int Scan::Caching() const { return caching_; }
diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h
index fb302b7..1085c4b 100644
--- a/hbase-native-client/core/scan.h
+++ b/hbase-native-client/core/scan.h
@@ -119,16 +119,6 @@ class Scan : public Query {
const std::string &StopRow() const;
/**
- * @brief Set whether this scan is a small scan.
- */
- void SetSmall(bool small);
-
- /**
- * @brief Returns if the scan is a small scan.
- */
- bool IsSmall() const;
-
- /**
* @brief Set the number of rows for caching that will be passed to scanners.
* Higher caching values will enable faster scanners but will use more memory.
* @param caching - the number of rows for caching.
@@ -258,12 +248,11 @@ class Scan : public Query {
std::string start_row_ = "";
std::string stop_row_ = "";
uint32_t max_versions_ = 1;
- int caching_ = -1;
+ int32_t caching_ = -1;
int64_t max_result_size_ = -1;
bool cache_blocks_ = true;
bool load_column_families_on_demand_ = false;
bool reversed_ = false;
- bool small_ = false;
bool allow_partial_results_ = false;
hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG;
std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>();
diff --git a/hbase-native-client/core/scanner-test.cc b/hbase-native-client/core/scanner-test.cc
new file mode 100644
index 0000000..1ecbd02
--- /dev/null
+++ b/hbase-native-client/core/scanner-test.cc
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "core/async-client-scanner.h"
+#include "core/async-table-result-scanner.h"
+#include "core/cell.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/filter.h"
+#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/put.h"
+#include "core/result.h"
+#include "core/row.h"
+#include "core/table.h"
+#include "if/Comparator.pb.h"
+#include "if/Filter.pb.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+#include "utils/time-util.h"
+
+using hbase::Cell;
+using hbase::ComparatorFactory;
+using hbase::Comparator;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::Put;
+using hbase::Result;
+using hbase::Scan;
+using hbase::Table;
+using hbase::TestUtil;
+using hbase::TimeUtil;
+using hbase::AsyncClientScanner;
+using hbase::AsyncTableResultScanner;
+using hbase::FilterFactory;
+using hbase::pb::CompareType;
+
+class ScannerTest : public ::testing::Test {
+ public:
+ static std::unique_ptr<hbase::TestUtil> test_util;
+ static const uint32_t num_rows;
+
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ }
+};
+std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr;
+const uint32_t ScannerTest::num_rows = 1000;
+
+std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); }
+
+std::string Row(uint32_t i, int width) {
+ std::ostringstream s;
+ s.fill('0');
+ s.width(width);
+ s << i;
+ return "row" + s.str();
+}
+
+std::string Row(uint32_t i) { return Row(i, 3); }
+
+std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) {
+ auto put = std::make_unique<Put>(row);
+
+ for (uint32_t i = 0; i < num_families; i++) {
+ put->AddColumn(Family(i), "q1", row);
+ put->AddColumn(Family(i), "q2", row + "-" + row);
+ }
+
+ return std::move(put);
+}
+
+void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) {
+ VLOG(1) << r.DebugString();
+ auto row = r.Row();
+ ASSERT_EQ(row, expected_row);
+ ASSERT_EQ(r.Cells().size(), num_families * 2);
+ for (uint32_t i = 0; i < num_families; i++) {
+ ASSERT_EQ(*r.Value(Family(i), "q1"), row);
+ ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row);
+ }
+}
+
+void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows,
+ int32_t num_regions) {
+ LOG(INFO) << "Creating the table " << table_name
+ << " with num_regions:" << folly::to<std::string>(num_regions);
+ std::vector<std::string> families;
+ for (uint32_t i = 0; i < num_families; i++) {
+ families.push_back(Family(i));
+ }
+ if (num_regions <= 1) {
+ ScannerTest::test_util->CreateTable(table_name, families);
+ } else {
+ std::vector<std::string> keys;
+ for (int32_t i = 0; i < num_regions - 1; i++) {
+ keys.push_back(Row(i * (num_rows / (num_regions - 1))));
+ LOG(INFO) << "Split key:" << keys[keys.size() - 1];
+ }
+ ScannerTest::test_util->CreateTable(table_name, families, keys);
+ }
+}
+
+std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name,
+ uint32_t num_families, uint32_t num_rows,
+ int32_t num_regions) {
+ CreateTable(table_name, num_families, num_rows, num_regions);
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+ auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+ auto table = client->Table(tn);
+
+ LOG(INFO) << "Writing data to the table, num_rows:" << num_rows;
+ // Perform Puts
+ for (uint32_t i = 0; i < num_rows; i++) {
+ table->Put(*MakePut(Row(i), num_families));
+ }
+ return std::move(client);
+}
+
+void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows,
+ Table *table) {
+ LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow()
+ << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows;
+ auto scanner = table->Scan(scan);
+
+ uint32_t i = start;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ CheckResult(*r, Row(i++), num_families);
+ r = scanner->Next();
+ }
+ ASSERT_EQ(i - start, num_rows);
+}
+
+void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) {
+ TestScan(scan, 1, start, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+ Scan scan{};
+ if (start >= 0) {
+ scan.SetStartRow(Row(start));
+ } else {
+ start = 0; // neded for below logic
+ }
+ if (stop >= 0) {
+ scan.SetStopRow(Row(stop));
+ }
+
+ TestScan(scan, num_families, start, num_rows, table);
+}
+
+void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+ TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows,
+ Table *table) {
+ Scan scan{};
+
+ scan.SetStartRow(start);
+ scan.SetStopRow(stop);
+
+ LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop
+ << " expected_num_rows:" << num_rows;
+ auto scanner = table->Scan(scan);
+
+ uint32_t i = 0;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ VLOG(1) << r->DebugString();
+ i++;
+ ASSERT_EQ(r->Map().size(), num_families);
+ r = scanner->Next();
+ }
+ ASSERT_EQ(i, num_rows);
+}
+
+void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) {
+ TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScanCombinations(Table *table, uint32_t num_families) {
+ // full table
+ TestScan(num_families, -1, -1, 1000, table);
+ TestScan(num_families, -1, 999, 999, table);
+ TestScan(num_families, 0, -1, 1000, table);
+ TestScan(num_families, 0, 999, 999, table);
+ TestScan(num_families, 10, 990, 980, table);
+ TestScan(num_families, 1, 998, 997, table);
+
+ TestScan(num_families, 123, 345, 222, table);
+ TestScan(num_families, 234, 456, 222, table);
+ TestScan(num_families, 345, 567, 222, table);
+ TestScan(num_families, 456, 678, 222, table);
+
+ // single results
+ TestScan(num_families, 111, 111, 1, table); // split keys are like 111, 222, 333, etc
+ TestScan(num_families, 111, 112, 1, table);
+ TestScan(num_families, 332, 332, 1, table);
+ TestScan(num_families, 332, 333, 1, table);
+ TestScan(num_families, 333, 333, 1, table);
+ TestScan(num_families, 333, 334, 1, table);
+ TestScan(num_families, 42, 42, 1, table);
+ TestScan(num_families, 921, 921, 1, table);
+ TestScan(num_families, 0, 0, 1, table);
+ TestScan(num_families, 0, 1, 1, table);
+ TestScan(num_families, 999, 999, 1, table);
+
+ // few results
+ TestScan(num_families, 0, 0, 1, table);
+ TestScan(num_families, 0, 2, 2, table);
+ TestScan(num_families, 0, 5, 5, table);
+ TestScan(num_families, 10, 15, 5, table);
+ TestScan(num_families, 105, 115, 10, table);
+ TestScan(num_families, 111, 221, 110, table);
+ TestScan(num_families, 111, 222, 111, table); // crossing region boundary 111-222
+ TestScan(num_families, 111, 223, 112, table);
+ TestScan(num_families, 111, 224, 113, table);
+ TestScan(num_families, 990, 999, 9, table);
+ TestScan(num_families, 900, 998, 98, table);
+
+ // empty results
+ TestScan(num_families, "a", "a", 0, table);
+ TestScan(num_families, "a", "r", 0, table);
+ TestScan(num_families, "", "r", 0, table);
+ TestScan(num_families, "s", "", 0, table);
+ TestScan(num_families, "s", "z", 0, table);
+ TestScan(num_families, Row(110) + "a", Row(111), 0, table);
+ TestScan(num_families, Row(111) + "a", Row(112), 0, table);
+ TestScan(num_families, Row(123) + "a", Row(124), 0, table);
+
+ // custom
+ TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+ TestScan(num_families, Row(0, 3), Row(0, 4), 1, table);
+ TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table);
+ TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+ TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table);
+ TestScan(num_families, "a", "z", 1000, table);
+}
+
+// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and
+// TestScannersFromClientSide*
+
+TEST_F(ScannerTest, SingleRegionScan) {
+ auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan"));
+
+ TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, MultiRegionScan) {
+ auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+ TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, ScanWithPauses) {
+ auto max_result_size =
+ ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152);
+ ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100);
+ auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+ VLOG(1) << "Starting scan for the test";
+ Scan scan{};
+ scan.SetCaching(100);
+ auto scanner = table->Scan(scan);
+
+ uint32_t i = 0;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ CheckResult(*r, Row(i++), 1);
+ r = scanner->Next();
+ std::this_thread::sleep_for(TimeUtil::MillisToNanos(10));
+ }
+
+ auto s = static_cast<AsyncTableResultScanner *>(scanner.get());
+ ASSERT_GT(s->num_prefetch_stopped(), 0);
+
+ ASSERT_EQ(i, num_rows);
+ ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size);
+}
+
+TEST_F(ScannerTest, ScanWithFilters) {
+ auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters"));
+
+ Scan scan{};
+ scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL,
+ *ComparatorFactory::BinaryComparator(Row(800))));
+
+ TestScan(scan, 800, 200, table.get());
+}
+
+TEST_F(ScannerTest, ScanMultiFamily) {
+ auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family"));
+
+ TestScanCombinations(table.get(), 3);
+}
+
+TEST_F(ScannerTest, ScanNullQualifier) {
+ std::string table_name{"t_scan_null_qualifier"};
+ std::string row{"row"};
+ CreateTable(table_name, 1, 1, 1);
+
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+ auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+ auto table = client->Table(tn);
+
+ // Perform Puts
+ Put put{row};
+ put.AddColumn(Family(0), "q1", row);
+ put.AddColumn(Family(0), "", row);
+ table->Put(put);
+
+ Scan scan1{};
+ scan1.AddColumn(Family(0), "");
+ auto scanner1 = table->Scan(scan1);
+ auto r1 = scanner1->Next();
+ ASSERT_EQ(r1->Cells().size(), 1);
+ ASSERT_EQ(scanner1->Next(), nullptr);
+
+ Scan scan2{};
+ scan2.AddFamily(Family(0));
+ auto scanner2 = table->Scan(scan2);
+ auto r2 = scanner2->Next();
+ ASSERT_EQ(r2->Cells().size(), 2);
+ ASSERT_EQ(scanner2->Next(), nullptr);
+}
+
+TEST_F(ScannerTest, ScanNoResults) {
+ std::string table_name{"t_scan_no_results"};
+ auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3);
+ auto table = client->Table(folly::to<hbase::pb::TableName>(table_name));
+
+ Scan scan{};
+ scan.AddColumn(Family(0), "non_existing_qualifier");
+
+ TestScan(scan, 0, 0, table.get());
+}
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 3a7d62b..f79d848 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -30,6 +30,7 @@
#include "core/client.h"
#include "core/get.h"
#include "core/put.h"
+#include "core/scan.h"
#include "core/table.h"
#include "serde/server-name.h"
#include "serde/table-name.h"
@@ -38,6 +39,7 @@
using hbase::Client;
using hbase::Configuration;
using hbase::Get;
+using hbase::Scan;
using hbase::Put;
using hbase::Table;
using hbase::pb::TableName;
@@ -48,6 +50,10 @@ DEFINE_string(table, "test_table", "What table to do the reads or writes");
DEFINE_string(row, "row_", "row prefix");
DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
+DEFINE_bool(puts, true, "Whether to perform puts");
+DEFINE_bool(gets, true, "Whether to perform gets");
+DEFINE_bool(multigets, true, "Whether to perform multi-gets");
+DEFINE_bool(scans, true, "Whether to perform scans");
DEFINE_bool(display_results, false, "Whether to display the Results from Gets");
DEFINE_int32(threads, 6, "How many cpu threads");
@@ -86,41 +92,72 @@ int main(int argc, char *argv[]) {
auto start_ns = TimeUtil::GetNowNanos();
// Do the Put requests
- for (uint64_t i = 0; i < num_puts; i++) {
- table->Put(*MakePut(Row(FLAGS_row, i)));
- }
+ if (FLAGS_puts) {
+ LOG(INFO) << "Sending put requests";
+ for (uint64_t i = 0; i < num_puts; i++) {
+ table->Put(*MakePut(Row(FLAGS_row, i)));
+ }
- LOG(INFO) << "Successfully sent " << num_puts << " Put requests in "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ LOG(INFO) << "Successfully sent " << num_puts << " Put requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ }
// Do the Get requests
- start_ns = TimeUtil::GetNowNanos();
- for (uint64_t i = 0; i < num_puts; i++) {
- auto result = table->Get(Get{Row(FLAGS_row, i)});
- if (FLAGS_display_results) {
- LOG(INFO) << result->DebugString();
+ if (FLAGS_gets) {
+ LOG(INFO) << "Sending get requests";
+ start_ns = TimeUtil::GetNowNanos();
+ for (uint64_t i = 0; i < num_puts; i++) {
+ auto result = table->Get(Get{Row(FLAGS_row, i)});
+ if (FLAGS_display_results) {
+ LOG(INFO) << result->DebugString();
+ }
}
- }
- LOG(INFO) << "Successfully sent " << num_puts << " Get requests in "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ LOG(INFO) << "Successfully sent " << num_puts << " Get requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ }
// Do the Multi-Gets
- std::vector<hbase::Get> gets;
- for (uint64_t i = 0; i < num_puts; ++i) {
- hbase::Get get(Row(FLAGS_row, i));
- gets.push_back(get);
- }
+ if (FLAGS_multigets) {
+ std::vector<hbase::Get> gets;
+ for (uint64_t i = 0; i < num_puts; ++i) {
+ hbase::Get get(Row(FLAGS_row, i));
+ gets.push_back(get);
+ }
+
+ LOG(INFO) << "Sending multi-get requests";
+ start_ns = TimeUtil::GetNowNanos();
+ auto results = table->Get(gets);
- start_ns = TimeUtil::GetNowNanos();
- auto results = table->Get(gets);
+ if (FLAGS_display_results) {
+ for (const auto &result : results) LOG(INFO) << result->DebugString();
+ }
- if (FLAGS_display_results) {
- for (const auto &result : results) LOG(INFO) << result->DebugString();
+ LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
- LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ // Do the Scan
+ if (FLAGS_scans) {
+ LOG(INFO) << "Starting scanner";
+ start_ns = TimeUtil::GetNowNanos();
+ Scan scan{};
+ auto scanner = table->Scan(scan);
+
+ uint64_t i = 0;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ if (FLAGS_display_results) {
+ LOG(INFO) << r->DebugString();
+ }
+ r = scanner->Next();
+ i++;
+ }
+
+ LOG(INFO) << "Successfully iterated over " << i << " Scan results in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ scanner->Close();
+ }
table->Close();
client->Close();
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index aa51989..9cdd8b0 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -25,6 +25,7 @@
#include <vector>
#include "core/async-connection.h"
+#include "core/async-table-result-scanner.h"
#include "core/request-converter.h"
#include "core/response-converter.h"
#include "if/Client.pb.h"
@@ -52,6 +53,21 @@ std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
return context.get(operation_timeout());
}
+std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) {
+ auto max_cache_size = ResultSize2CacheSize(
+ scan.MaxResultSize() > 0 ? scan.MaxResultSize()
+ : async_connection_->connection_conf()->scanner_max_result_size());
+ auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size);
+ async_table_->Scan(scan, scanner);
+ return scanner;
+}
+
+int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const {
+ // * 2 if possible
+ return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size
+ : max_results_size * 2;
+}
+
void Table::Put(const hbase::Put &put) {
auto future = async_table_->Put(put);
future.get(operation_timeout());
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 81ddc8e..1f6d9b7 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -32,6 +32,7 @@
#include "core/location-cache.h"
#include "core/put.h"
#include "core/raw-async-table.h"
+#include "core/result-scanner.h"
#include "core/result.h"
#include "serde/table-name.h"
@@ -74,6 +75,8 @@ class Table {
std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment);
// TODO: Batch Puts
+ std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
+
/**
* @brief - Close the client connection.
*/
@@ -92,5 +95,7 @@ class Table {
private:
std::chrono::milliseconds operation_timeout() const;
+
+ int64_t ResultSize2CacheSize(int64_t max_results_size) const;
};
} /* namespace hbase */
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
index eef4437..07ffeaf 100644
--- a/hbase-native-client/exceptions/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -17,8 +17,12 @@
cxx_library(
name="exceptions",
- exported_headers=["exception.h",],
+ exported_headers=[
+ "exception.h",
+ ],
srcs=[],
- deps=["//third-party:folly",],
+ deps=[
+ "//third-party:folly",
+ ],
compiler_flags=['-Weffc++'],
- visibility=['//core/...','//connection//...'],)
\ No newline at end of file
+ visibility=['//core/...', '//connection//...'],)
diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h
index f25fbea..9cbd7ae 100644
--- a/hbase-native-client/exceptions/exception.h
+++ b/hbase-native-client/exceptions/exception.h
@@ -18,25 +18,22 @@
*/
#pragma once
+#include <folly/ExceptionWrapper.h>
+#include <folly/io/IOBuf.h>
#include <exception>
#include <string>
#include <vector>
-#include <folly/io/IOBuf.h>
-#include <folly/ExceptionWrapper.h>
namespace hbase {
class ThrowableWithExtraContext {
-public:
- ThrowableWithExtraContext(folly::exception_wrapper cause,
- const long& when) :
- cause_(cause), when_(when), extras_("") {
- }
+ public:
+ ThrowableWithExtraContext(folly::exception_wrapper cause, const long& when)
+ : cause_(cause), when_(when), extras_("") {}
- ThrowableWithExtraContext(folly::exception_wrapper cause,
- const long& when, const std::string& extras) :
- cause_(cause), when_(when), extras_(extras) {
- }
+ ThrowableWithExtraContext(folly::exception_wrapper cause, const long& when,
+ const std::string& extras)
+ : cause_(cause), when_(when), extras_(extras) {}
virtual std::string ToString() {
// TODO:
@@ -44,55 +41,45 @@ public:
return extras_ + ", " + cause_.what().toStdString();
}
- virtual folly::exception_wrapper cause() {
- return cause_;
- }
-private:
+ virtual folly::exception_wrapper cause() { return cause_; }
+
+ private:
folly::exception_wrapper cause_;
long when_;
std::string extras_;
};
-class IOException: public std::logic_error {
-public:
+class IOException : public std::logic_error {
+ public:
IOException() : logic_error("") {}
- IOException(
- const std::string& what) :
- logic_error(what) {}
- IOException(
- const std::string& what,
- folly::exception_wrapper cause) :
- logic_error(what), cause_(cause) {}
+ IOException(const std::string& what) : logic_error(what) {}
+ IOException(const std::string& what, folly::exception_wrapper cause)
+ : logic_error(what), cause_(cause) {}
virtual ~IOException() = default;
- virtual folly::exception_wrapper cause() {
- return cause_;
- }
-private:
+ virtual folly::exception_wrapper cause() { return cause_; }
+
+ private:
folly::exception_wrapper cause_;
};
-class RetriesExhaustedException: public IOException {
-public:
- RetriesExhaustedException(
- const int& num_retries,
- std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) :
- IOException(
- GetMessage(num_retries, exceptions),
- exceptions->empty() ? folly::exception_wrapper{}
- : (*exceptions)[exceptions->size() - 1].cause()){
- }
+class RetriesExhaustedException : public IOException {
+ public:
+ RetriesExhaustedException(const int& num_retries,
+ std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions)
+ : IOException(GetMessage(num_retries, exceptions),
+ exceptions->empty() ? folly::exception_wrapper{}
+ : (*exceptions)[exceptions->size() - 1].cause()) {}
virtual ~RetriesExhaustedException() = default;
-private:
- std::string GetMessage(
- const int& num_retries,
- std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) {
+ private:
+ std::string GetMessage(const int& num_retries,
+ std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) {
std::string buffer("Failed after attempts=");
buffer.append(std::to_string(num_retries + 1));
buffer.append(", exceptions:\n");
- for (auto it = exceptions->begin(); it != exceptions->end(); it++) {
+ for (auto it = exceptions->begin(); it != exceptions->end(); it++) {
buffer.append(it->ToString());
buffer.append("\n");
}
@@ -100,74 +87,141 @@ private:
}
};
-class HBaseIOException : public IOException {
-};
+class HBaseIOException : public IOException {};
class RemoteException : public IOException {
-public:
-
+ public:
RemoteException() : port_(0), do_not_retry_(false) {}
- RemoteException(const std::string& what) :
- IOException(what), port_(0), do_not_retry_(false) {}
+ RemoteException(const std::string& what) : IOException(what), port_(0), do_not_retry_(false) {}
- RemoteException(
- const std::string& what,
- folly::exception_wrapper cause) :
- IOException(what, cause), port_(0), do_not_retry_(false) {}
+ RemoteException(const std::string& what, folly::exception_wrapper cause)
+ : IOException(what, cause), port_(0), do_not_retry_(false) {}
virtual ~RemoteException() = default;
- std::string exception_class_name() const {
- return exception_class_name_;
- }
+ std::string exception_class_name() const { return exception_class_name_; }
RemoteException* set_exception_class_name(const std::string& value) {
exception_class_name_ = value;
return this;
}
- std::string stack_trace() const {
- return stack_trace_;
- }
+ std::string stack_trace() const { return stack_trace_; }
RemoteException* set_stack_trace(const std::string& value) {
stack_trace_ = value;
return this;
}
- std::string hostname() const {
- return hostname_;
- }
+ std::string hostname() const { return hostname_; }
RemoteException* set_hostname(const std::string& value) {
hostname_ = value;
return this;
}
- int port() const {
- return port_;
- }
+ int port() const { return port_; }
RemoteException* set_port(int value) {
port_ = value;
return this;
}
- bool do_not_retry() const {
- return do_not_retry_;
- }
+ bool do_not_retry() const { return do_not_retry_; }
RemoteException* set_do_not_retry(bool value) {
do_not_retry_ = value;
return this;
}
-private:
+ private:
std::string exception_class_name_;
std::string stack_trace_;
std::string hostname_;
int port_;
bool do_not_retry_;
};
+
+/**
+ * List of known exceptions from Java side, and Java-specific exception logic
+ */
+class ExceptionUtil {
+ private:
+ // unknown scanner and sub-classes
+ static constexpr const char* kUnknownScannerException =
+ "org.apache.hadoop.hbase.UnknownScannerException";
+
+ // not serving region and sub-classes
+ static constexpr const char* kNotServingRegionException =
+ "org.apache.hadoop.hbase.NotServingRegionException";
+ static constexpr const char* kRegionInRecoveryException =
+ "org.apache.hadoop.hbase.exceptions.RegionInRecoveryException";
+ static constexpr const char* kRegionOpeningException =
+ "org.apache.hadoop.hbase.exceptions.RegionOpeningException";
+ static constexpr const char* kRegionMovedException =
+ "org.apache.hadoop.hbase.exceptions.RegionMovedException";
+
+ // Region server stopped and sub-classes
+ static constexpr const char* kRegionServerStoppedException =
+ "org.apache.hadoop.hbase.regionserver.RegionServerStoppedException";
+ static constexpr const char* kRegionServerAbortedException =
+ "org.apache.hadoop.hbase.regionserver.RegionServerAbortedException";
+
+ // other scanner related exceptions
+ static constexpr const char* kOutOfOrderScannerNextException =
+ "org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException";
+ static constexpr const char* kScannerResetException =
+ "org.apache.hadoop.hbase.exceptions.ScannerResetException";
+
+ public:
+ /**
+ * Returns whether or not the exception should be retried by looking at the
+ * remote exception.
+ */
+ static bool ShouldRetry(const folly::exception_wrapper& error) {
+ bool do_not_retry = false;
+ error.with_exception(
+ [&](const RemoteException& remote_ex) { do_not_retry = remote_ex.do_not_retry(); });
+ return !do_not_retry;
+ }
+
+ /**
+ * Returns whether the scanner is closed when the client received the
+ * remote exception.
+ * Ok, here is a nice detail about the java exceptions. In the java side, we
+ * just have a hierarchy of Exception classes that we use both client side and
+ * server side. On the client side, we rethrow the server side exception by
+ * un-wrapping the exception from a RemoteException or a ServiceException
+ * (see ConnectionUtils.translateException() in Java).
+ * Since this object-hierarchy info is not available in C++ side, we are doing a
+ * very fragile catch-all list of all exception types in Java that extend these
+ * three base classes: UnknownScannerException, NotServingRegionException,
+ * RegionServerStoppedException
+ */
+ static bool IsScannerClosed(const folly::exception_wrapper& exception) {
+ bool scanner_closed = false;
+ exception.with_exception([&](const RemoteException& remote_ex) {
+ auto java_class = remote_ex.exception_class_name();
+ if (java_class == kUnknownScannerException || java_class == kNotServingRegionException ||
+ java_class == kRegionInRecoveryException || java_class == kRegionOpeningException ||
+ java_class == kRegionMovedException || java_class == kRegionServerStoppedException ||
+ java_class == kRegionServerAbortedException) {
+ scanner_closed = true;
+ }
+ });
+ return scanner_closed;
+ }
+
+ static bool IsScannerOutOfOrder(const folly::exception_wrapper& exception) {
+ bool scanner_out_of_order = false;
+ exception.with_exception([&](const RemoteException& remote_ex) {
+ auto java_class = remote_ex.exception_class_name();
+ if (java_class == kOutOfOrderScannerNextException || java_class == kScannerResetException) {
+ scanner_out_of_order = true;
+ }
+ });
+ return scanner_out_of_order;
+ }
+};
} // namespace hbase
diff --git a/hbase-native-client/test-util/BUCK b/hbase-native-client/test-util/BUCK
index 4998614..7c92841 100644
--- a/hbase-native-client/test-util/BUCK
+++ b/hbase-native-client/test-util/BUCK
@@ -16,33 +16,37 @@
# limitations under the License.
import os
-cxx_library(name="test-util",
- exported_headers=[
- "test-util.h","mini-cluster.h"
- ],
- srcs=["test-util.cc","mini-cluster.cc"],
- deps=[
- "//third-party:folly",
- "//core:core",
+cxx_library(
+ name="test-util",
+ exported_headers=["test-util.h", "mini-cluster.h"],
+ srcs=["test-util.cc", "mini-cluster.cc"],
+ deps=[
+ "//third-party:folly",
+ "//core:core",
],
- preprocessor_flags= [
- '-I' + os.environ['JAVA_HOME'] + '/include',
- '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
- '-I' + os.environ['JAVA_HOME'] + '/include/linux'],
- exported_preprocessor_flags= [
- '-I' + os.environ['JAVA_HOME'] + '/include',
- '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
- '-I' + os.environ['JAVA_HOME'] + '/include/linux'],
- compiler_flags = [
- '-I' + os.environ['JAVA_HOME'] + '/include',
- '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
- '-I' + os.environ['JAVA_HOME'] + '/include/linux', '-ggdb'],
- linker_flags = ['-ljvm',
- '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server', '-ggdb'],
- exported_linker_flags = ['-ljvm',
- '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server',
- '-Wl,-rpath=' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server'],
- visibility=[
- 'PUBLIC',
- ],
- )
+ preprocessor_flags=[
+ '-I' + os.environ['JAVA_HOME'] + '/include',
+ '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
+ '-I' + os.environ['JAVA_HOME'] + '/include/linux'
+ ],
+ exported_preprocessor_flags=[
+ '-I' + os.environ['JAVA_HOME'] + '/include',
+ '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
+ '-I' + os.environ['JAVA_HOME'] + '/include/linux'
+ ],
+ compiler_flags=[
+ '-I' + os.environ['JAVA_HOME'] + '/include',
+ '-I' + os.environ['JAVA_HOME'] + '/include/darwin',
+ '-I' + os.environ['JAVA_HOME'] + '/include/linux', '-ggdb'
+ ],
+ linker_flags=[
+ '-ljvm', '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server',
+ '-ggdb'
+ ],
+ exported_linker_flags=[
+ '-ljvm', '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server',
+ '-Wl,-rpath=' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server'
+ ],
+ visibility=[
+ 'PUBLIC',
+ ],)
diff --git a/hbase-native-client/test-util/mini-cluster.cc b/hbase-native-client/test-util/mini-cluster.cc
index 34da54c..688ea8e 100644
--- a/hbase-native-client/test-util/mini-cluster.cc
+++ b/hbase-native-client/test-util/mini-cluster.cc
@@ -119,6 +119,9 @@ void MiniCluster::Setup() {
env_->GetMethodID(testing_util_class_, "createTable",
"(Lorg/apache/hadoop/hbase/TableName;Ljava/lang/String;)Lorg/"
"apache/hadoop/hbase/client/Table;");
+ create_table_families_mid_ = env_->GetMethodID(testing_util_class_, "createTable",
+ "(Lorg/apache/hadoop/hbase/TableName;[[B)Lorg/"
+ "apache/hadoop/hbase/client/Table;");
create_table_with_split_mid_ = env_->GetMethodID(
testing_util_class_, "createTable",
"(Lorg/apache/hadoop/hbase/TableName;[[B[[B)Lorg/apache/hadoop/hbase/client/Table;");
@@ -198,20 +201,45 @@ jobject MiniCluster::CreateTable(const std::string &table, const std::string &fa
return table_obj;
}
+jobject MiniCluster::CreateTable(const std::string &table,
+ const std::vector<std::string> &families) {
+ jstring table_name_str = env_->NewStringUTF(table.c_str());
+ jobject table_name =
+ env_->CallStaticObjectMethod(table_name_class_, tbl_name_value_of_mid_, table_name_str);
+ jclass array_element_type = env_->FindClass("[B");
+ jobjectArray family_array = env_->NewObjectArray(families.size(), array_element_type, nullptr);
+ int i = 0;
+ for (auto family : families) {
+ env_->SetObjectArrayElement(family_array, i++, StrToByteChar(family));
+ }
+ jobject table_obj =
+ env_->CallObjectMethod(htu_, create_table_families_mid_, table_name, family_array);
+ return table_obj;
+}
+
jobject MiniCluster::CreateTable(const std::string &table, const std::string &family,
const std::vector<std::string> &keys) {
+ std::vector<std::string> families{};
+ families.push_back(std::string{family});
+ return CreateTable(table, families, keys);
+}
+
+jobject MiniCluster::CreateTable(const std::string &table, const std::vector<std::string> &families,
+ const std::vector<std::string> &keys) {
jstring table_name_str = env_->NewStringUTF(table.c_str());
jobject table_name =
env_->CallStaticObjectMethod(table_name_class_, tbl_name_value_of_mid_, table_name_str);
jclass array_element_type = env_->FindClass("[B");
- jobjectArray family_array = env_->NewObjectArray(1, array_element_type, env_->NewByteArray(1));
- env_->SetObjectArrayElement(family_array, 0, StrToByteChar(family));
+ int i = 0;
+ jobjectArray family_array = env_->NewObjectArray(families.size(), array_element_type, nullptr);
+ for (auto family : families) {
+ env_->SetObjectArrayElement(family_array, i++, StrToByteChar(family));
+ }
- jobjectArray key_array =
- env_->NewObjectArray(keys.size(), array_element_type, env_->NewByteArray(1));
+ jobjectArray key_array = env_->NewObjectArray(keys.size(), array_element_type, nullptr);
- int i = 0;
+ i = 0;
for (auto key : keys) {
env_->SetObjectArrayElement(key_array, i++, StrToByteChar(key));
}
diff --git a/hbase-native-client/test-util/mini-cluster.h b/hbase-native-client/test-util/mini-cluster.h
index 4119cb5..b8ac391 100644
--- a/hbase-native-client/test-util/mini-cluster.h
+++ b/hbase-native-client/test-util/mini-cluster.h
@@ -18,9 +18,9 @@
*/
#pragma once
+#include <jni.h>
#include <string>
#include <vector>
-#include "jni.h"
namespace hbase {
@@ -29,8 +29,11 @@ class MiniCluster {
jobject StartCluster(int32_t num_region_servers);
void StopCluster();
jobject CreateTable(const std::string &table, const std::string &family);
+ jobject CreateTable(const std::string &table, const std::vector<std::string> &families);
jobject CreateTable(const std::string &table, const std::string &family,
const std::vector<std::string> &keys);
+ jobject CreateTable(const std::string &table, const std::vector<std::string> &families,
+ const std::vector<std::string> &keys);
jobject StopRegionServer(int idx);
// moves region to server
@@ -51,6 +54,7 @@ class MiniCluster {
jmethodID set_conf_mid_;
jmethodID tbl_name_value_of_mid_;
jmethodID create_table_mid_;
+ jmethodID create_table_families_mid_;
jmethodID create_table_with_split_mid_;
jmethodID put_mid_;
jmethodID put_ctor_;
diff --git a/hbase-native-client/test-util/test-util.cc b/hbase-native-client/test-util/test-util.cc
index c4e6ed2..26862d8 100644
--- a/hbase-native-client/test-util/test-util.cc
+++ b/hbase-native-client/test-util/test-util.cc
@@ -65,11 +65,20 @@ void TestUtil::CreateTable(const std::string &table, const std::string &family)
mini_->CreateTable(table, family);
}
+void TestUtil::CreateTable(const std::string &table, const std::vector<std::string> &families) {
+ mini_->CreateTable(table, families);
+}
+
void TestUtil::CreateTable(const std::string &table, const std::string &family,
const std::vector<std::string> &keys) {
mini_->CreateTable(table, family, keys);
}
+void TestUtil::CreateTable(const std::string &table, const std::vector<std::string> &families,
+ const std::vector<std::string> &keys) {
+ mini_->CreateTable(table, families, keys);
+}
+
void TestUtil::StartStandAloneInstance() {
auto p = temp_dir_.path().string();
auto cmd = std::string{"bin/start-local-hbase.sh " + p};
diff --git a/hbase-native-client/test-util/test-util.h b/hbase-native-client/test-util/test-util.h
index 5729674..e26558b 100644
--- a/hbase-native-client/test-util/test-util.h
+++ b/hbase-native-client/test-util/test-util.h
@@ -59,8 +59,11 @@ class TestUtil {
void StopMiniCluster();
void CreateTable(const std::string &table, const std::string &family);
+ void CreateTable(const std::string &table, const std::vector<std::string> &families);
void CreateTable(const std::string &table, const std::string &family,
const std::vector<std::string> &keys);
+ void CreateTable(const std::string &table, const std::vector<std::string> &families,
+ const std::vector<std::string> &keys);
void StartStandAloneInstance();
void StopStandAloneInstance();
diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK
index ed8e114..788056b 100644
--- a/hbase-native-client/utils/BUCK
+++ b/hbase-native-client/utils/BUCK
@@ -20,6 +20,7 @@ cxx_library(
exported_headers=[
"bytes-util.h",
"connection-util.h",
+ "optional.h",
"sys-util.h",
"time-util.h",
"user-util.h",
diff --git a/hbase-native-client/utils/bytes-util-test.cc b/hbase-native-client/utils/bytes-util-test.cc
index ca64a21..16af021 100644
--- a/hbase-native-client/utils/bytes-util-test.cc
+++ b/hbase-native-client/utils/bytes-util-test.cc
@@ -47,6 +47,7 @@ TEST(TestBytesUtil, TestToStringBinary) {
EXPECT_EQ("foo_\\x00\\xFF_bar",
BytesUtil::ToStringBinary("foo_" + std::string{zero} + std::string{max} + "_bar"));
}
+
TEST(TestBytesUtil, TestToStringToInt64) {
int64_t num = 761235;
EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num)));
@@ -57,3 +58,13 @@ TEST(TestBytesUtil, TestToStringToInt64) {
num = 0;
EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num)));
}
+
+TEST(TestBytesUtil, TestCreateClosestRowAfter) {
+ std::string empty{""};
+ EXPECT_EQ(BytesUtil::CreateClosestRowAfter(empty), std::string{'\0'});
+
+ std::string foo{"foo"};
+ EXPECT_EQ(BytesUtil::CreateClosestRowAfter(foo), std::string{"foo"} + '\0');
+
+ EXPECT_EQ("f\\x00", BytesUtil::ToStringBinary(BytesUtil::CreateClosestRowAfter("f")));
+}
diff --git a/hbase-native-client/utils/bytes-util.h b/hbase-native-client/utils/bytes-util.h
index 3566d62..6221bf0 100644
--- a/hbase-native-client/utils/bytes-util.h
+++ b/hbase-native-client/utils/bytes-util.h
@@ -42,7 +42,27 @@ class BytesUtil {
*/
static std::string ToStringBinary(const std::string& b, size_t off, size_t len);
- static std::string ToString(int64_t amt);
- static long ToInt64(std::string str);
+ static std::string ToString(int64_t value);
+
+ static int64_t ToInt64(std::string str);
+
+ static bool IsEmptyStartRow(const std::string& row) { return row == ""; }
+
+ static bool IsEmptyStopRow(const std::string& row) { return row == ""; }
+
+ static int32_t CompareTo(const std::string& a, const std::string& b) {
+ if (a < b) {
+ return -1;
+ }
+ if (a == b) {
+ return 0;
+ }
+ return 1;
+ }
+
+ /**
+ * Create the closest row after the specified row
+ */
+ static std::string CreateClosestRowAfter(std::string row) { return row.append(1, '\0'); }
};
} /* namespace hbase */
diff --git a/hbase-native-client/core/query.h b/hbase-native-client/utils/optional.h
similarity index 70%
copy from hbase-native-client/core/query.h
copy to hbase-native-client/utils/optional.h
index b706303..a05eab5 100644
--- a/hbase-native-client/core/query.h
+++ b/hbase-native-client/utils/optional.h
@@ -19,23 +19,14 @@
#pragma once
-#include "core/filter.h"
+#include <experimental/optional>
namespace hbase {
/**
- * Base class for read RPC calls (Get / Scan).
+ * An optional value that may or may not be present.
*/
-class Query {
- public:
- virtual ~Query() {}
+template <class T>
+using optional = std::experimental::optional<T>;
- void SetFilter(std::unique_ptr<Filter> filter) { filter_ = std::move(filter); }
-
- const std::unique_ptr<Filter>& filter() const { return filter_; }
-
- protected:
- std::unique_ptr<Filter> filter_ = nullptr;
-};
-
-} // namespace hbase
+} /* namespace hbase */