You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/06/06 22:16:35 UTC
[2/3] hbase git commit: HBASE-17907 [C++] End to end Scans from
Client/Table
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index f71fbba..998e2f1 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -111,9 +111,10 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) {
folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
auto caller =
CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout())
- ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller,
- std::shared_ptr<hbase::RegionLocation> loc,
- std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
+ ->action([=, &del](
+ std::shared_ptr<hbase::HBaseRpcController> controller,
+ std::shared_ptr<hbase::RegionLocation> loc,
+ std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> {
return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>(
rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest,
[](const Response& r) -> folly::Unit { return folly::unit; });
@@ -143,4 +144,24 @@ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::B
return caller->Call().then([caller](auto r) { return r; });
}
+
+void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) {
+ auto scanner = AsyncClientScanner::Create(
+ connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(),
+ connection_conf_->max_retries(), connection_conf_->scan_timeout(),
+ connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count());
+ scanner->Start();
+}
+
+std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) {
+ // always create a new scan object as we may reset the start row later.
+ auto new_scan = std::make_shared<hbase::Scan>(scan);
+ if (new_scan->Caching() <= 0) {
+ new_scan->SetCaching(default_scanner_caching_);
+ }
+ if (new_scan->MaxResultSize() <= 0) {
+ new_scan->SetMaxResultSize(default_scanner_max_result_size_);
+ }
+ return new_scan;
+}
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index c8e9f2f..8c40dae 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -24,7 +24,9 @@
#include <memory>
#include <string>
#include <vector>
+
#include "core/async-batch-rpc-retrying-caller.h"
+#include "core/async-client-scanner.h"
#include "core/async-connection.h"
#include "core/async-rpc-retrying-caller-factory.h"
#include "core/async-rpc-retrying-caller.h"
@@ -34,6 +36,7 @@
#include "core/increment.h"
#include "core/put.h"
#include "core/result.h"
+#include "core/scan.h"
namespace hbase {
@@ -48,14 +51,22 @@ class RawAsyncTable {
: connection_(connection),
connection_conf_(connection->connection_conf()),
table_name_(table_name),
- rpc_client_(connection->rpc_client()) {}
+ rpc_client_(connection->rpc_client()) {
+ default_scanner_caching_ = connection_conf_->scanner_caching();
+ default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size();
+ }
virtual ~RawAsyncTable() = default;
folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get);
- folly::Future<folly::Unit> Delete(const hbase::Delete& del);
- folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+ folly::Future<folly::Unit> Delete(const hbase::Delete& del);
+
+ folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
+
folly::Future<folly::Unit> Put(const hbase::Put& put);
+
+ void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer);
+
void Close() {}
folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get(
@@ -69,6 +80,8 @@ class RawAsyncTable {
std::shared_ptr<ConnectionConfiguration> connection_conf_;
std::shared_ptr<pb::TableName> table_name_;
std::shared_ptr<RpcClient> rpc_client_;
+ int32_t default_scanner_caching_;
+ int64_t default_scanner_max_result_size_;
/* Methods */
template <typename REQ, typename PREQ, typename PRESP, typename RESP>
@@ -81,5 +94,7 @@ class RawAsyncTable {
template <typename RESP>
std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(
std::string row, std::chrono::nanoseconds rpc_timeout);
+
+ std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan);
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-scan-result-consumer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-scan-result-consumer.h b/hbase-native-client/core/raw-scan-result-consumer.h
new file mode 100644
index 0000000..b7c3c48
--- /dev/null
+++ b/hbase-native-client/core/raw-scan-result-consumer.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Logging.h>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed };
+
+enum class ScanResumerState { kInitialized, kSuspended, kResumed };
+
+/**
+ * Used to resume a scan.
+ */
+class ScanResumer {
+ public:
+ virtual ~ScanResumer() = default;
+
+ /**
+ * Resume the scan. You are free to call it multiple time but only the first call will take
+ * effect.
+ */
+ virtual void Resume() = 0;
+};
+
+/**
+ * Used to suspend or stop a scan.
+ * <p>
+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
+ * IllegalStateException will be thrown if you call them at other places.
+ * <p>
+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are
+ * free to not call them both), and the methods are not reentrant. A IllegalStateException will be
+ * thrown if you have already called one of the methods.
+ */
+class ScanController {
+ public:
+ virtual ~ScanController() = default;
+
+ /**
+ * Suspend the scan.
+ * <p>
+ * This means we will stop fetching data in background, i.e., will not call onNext any more
+ * before you resume the scan.
+ * @return A resumer used to resume the scan later.
+ */
+ virtual std::shared_ptr<ScanResumer> Suspend() = 0;
+
+ /**
+ * Terminate the scan.
+ * <p>
+ * This is useful when you have got enough results and want to stop the scan in onNext method,
+ * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+ */
+ virtual void Terminate() = 0;
+};
+
+/**
+ * Receives {@link Result} for an asynchronous scan.
+ * <p>
+ * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
+ * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
+ * HBase in background while you process the returned data, you need to move the processing work to
+ * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
+ * consuming tasks in all methods below unless you know what you are doing.
+ */
+class RawScanResultConsumer {
+ public:
+ virtual ~RawScanResultConsumer() = default;
+
+ /**
+ * Indicate that we have receive some data.
+ * @param results the data fetched from HBase service.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within scope of onNext method. You can only call its method in
+ * onNext, do NOT store it and call it later outside onNext.
+ */
+ virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results,
+ std::shared_ptr<ScanController> controller) {}
+
+ /**
+ * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+ * onNext.
+ * <p>
+ * This method give you a chance to terminate a slow scan operation.
+ * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+ * instance is only valid within the scope of onHeartbeat method. You can only call its
+ * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+ */
+ virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {}
+
+ /**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+ virtual void OnError(const folly::exception_wrapper &error) {}
+
+ /**
+ * Indicate that the scan operation is completed normally.
+ */
+ virtual void OnComplete() {}
+};
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index 4087d94..d5d9d67 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -26,7 +26,7 @@
namespace hbase {
-enum RegionLocateType { kBefore, kCurrent, kAfter };
+enum class RegionLocateType { kBefore, kCurrent, kAfter };
/**
* @brief class to hold where a region is located.
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter-test.cc b/hbase-native-client/core/request-converter-test.cc
index c01f16c..6c07a19 100644
--- a/hbase-native-client/core/request-converter-test.cc
+++ b/hbase-native-client/core/request-converter-test.cc
@@ -83,7 +83,6 @@ TEST(RequestConverter, ToScan) {
scan.SetReversed(true);
scan.SetStartRow(start_row);
scan.SetStopRow(stop_row);
- scan.SetSmall(true);
scan.SetCaching(3);
scan.SetConsistency(hbase::pb::Consistency::TIMELINE);
scan.SetCacheBlocks(true);
@@ -105,7 +104,7 @@ TEST(RequestConverter, ToScan) {
EXPECT_TRUE(msg->scan().reversed());
EXPECT_EQ(msg->scan().start_row(), start_row);
EXPECT_EQ(msg->scan().stop_row(), stop_row);
- EXPECT_TRUE(msg->scan().small());
+ EXPECT_FALSE(msg->scan().small());
EXPECT_EQ(msg->scan().caching(), 3);
EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE);
EXPECT_TRUE(msg->scan().cache_blocks());
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index a1e63fe..6eb2f04 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -53,19 +53,11 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
return pb_req;
}
-std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
- const std::string ®ion_name) {
- auto pb_req = Request::scan();
-
- auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
-
- RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
-
- auto pb_scan = pb_msg->mutable_scan();
+std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) {
+ auto pb_scan = std::make_unique<hbase::pb::Scan>();
pb_scan->set_max_versions(scan.MaxVersions());
pb_scan->set_cache_blocks(scan.CacheBlocks());
pb_scan->set_reversed(scan.IsReversed());
- pb_scan->set_small(scan.IsSmall());
pb_scan->set_caching(scan.Caching());
pb_scan->set_start_row(scan.StartRow());
pb_scan->set_stop_row(scan.StopRow());
@@ -94,12 +86,78 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release());
}
- // TODO We will change this later.
+ return std::move(pb_scan);
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+ const std::string ®ion_name) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+ pb_msg->set_allocated_scan(ToScan(scan).release());
+
+ SetCommonScanRequestFields(pb_msg, false);
+
+ return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+ const std::string ®ion_name,
+ int32_t num_rows, bool close_scanner) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+ pb_msg->set_allocated_scan(ToScan(scan).release());
+
+ pb_msg->set_number_of_rows(num_rows);
+ pb_msg->set_close_scanner(close_scanner);
+
+ SetCommonScanRequestFields(pb_msg, false);
+
+ return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ pb_msg->set_number_of_rows(num_rows);
+ pb_msg->set_close_scanner(close_scanner);
+ pb_msg->set_scanner_id(scanner_id);
+
+ SetCommonScanRequestFields(pb_msg, false);
+
+ return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner,
+ int64_t next_call_seq_id, bool renew) {
+ auto pb_req = Request::scan();
+ auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+ pb_msg->set_number_of_rows(num_rows);
+ pb_msg->set_close_scanner(close_scanner);
+ pb_msg->set_scanner_id(scanner_id);
+ pb_msg->set_next_call_seq(next_call_seq_id);
+
+ SetCommonScanRequestFields(pb_msg, renew);
+ return pb_req;
+}
+
+void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg,
+ bool renew) {
+ // TODO We will change these later when we implement partial results and heartbeats, etc
pb_msg->set_client_handles_partials(false);
pb_msg->set_client_handles_heartbeats(false);
pb_msg->set_track_scan_metrics(false);
-
- return pb_req;
+ pb_msg->set_renew(renew);
+ // TODO: set scan limit
}
std::unique_ptr<Request> RequestConverter::ToMultiRequest(
@@ -123,7 +181,6 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest(
}
}
- VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString();
return pb_req;
}
@@ -190,13 +247,13 @@ std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType t
DeleteType RequestConverter::ToDeleteType(const CellType type) {
switch (type) {
- case DELETE:
+ case CellType::DELETE:
return pb::MutationProto_DeleteType_DELETE_ONE_VERSION;
- case DELETE_COLUMN:
+ case CellType::DELETE_COLUMN:
return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS;
- case DELETE_FAMILY:
+ case CellType::DELETE_FAMILY:
return pb::MutationProto_DeleteType_DELETE_FAMILY;
- case DELETE_FAMILY_VERSION:
+ case CellType::DELETE_FAMILY_VERSION:
return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION;
default:
throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type));
@@ -216,12 +273,11 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put,
pb_msg->set_allocated_mutation(
ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release());
- VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
return pb_req;
}
std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del,
- const std::string ®ion_name) {
+ const std::string ®ion_name) {
auto pb_req = Request::mutate();
auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index 6d57161..c807f45 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -66,9 +66,20 @@ class RequestConverter {
*/
static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name);
+ static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name,
+ int32_t num_rows, bool close_scanner);
+
+ static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner);
+
+ static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows,
+ bool close_scanner, int64_t next_call_seq_id,
+ bool renew);
+
static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests);
- static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,const std::string ®ion_name);
+ static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,
+ const std::string ®ion_name);
static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string ®ion_name);
@@ -91,8 +102,10 @@ class RequestConverter {
*/
static void SetRegion(const std::string ®ion_name, pb::RegionSpecifier *region_specifier);
static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get);
+ static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan);
static DeleteType ToDeleteType(const CellType type);
static bool IsDelete(const CellType type);
+ static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew);
};
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 94b7875..d2d719b 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -54,7 +54,7 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& re
}
std::shared_ptr<Result> ResponseConverter::ToResult(
- const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) {
+ const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) {
std::vector<std::shared_ptr<Cell>> vcells;
for (auto cell : result.cell()) {
std::shared_ptr<Cell> pcell =
@@ -82,34 +82,38 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
- VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
- int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
- : scan_resp->results_size();
+ return FromScanResponse(scan_resp, resp.cell_scanner());
+}
+
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
+ const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) {
+ VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
+ << " cell_scanner:" << (cell_scanner == nullptr);
+ int num_results =
+ cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();
std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)};
for (int i = 0; i < num_results; i++) {
- if (resp.cell_scanner() != nullptr) {
+ if (cell_scanner != nullptr) {
// Cells are out in cellblocks. Group them up again as Results. How many to read at a
// time will be found in getCellsLength -- length here is how many Cells in the i'th Result
int num_cells = scan_resp->cells_per_result(i);
std::vector<std::shared_ptr<Cell>> vcells;
- while (resp.cell_scanner()->Advance()) {
- vcells.push_back(resp.cell_scanner()->Current());
- }
- // TODO: check associated cell count?
-
- if (vcells.size() != num_cells) {
- std::string msg = "Results sent from server=" + std::to_string(num_results) +
- ". But only got " + std::to_string(i) +
- " results completely at client. Resetting the scanner to scan again.";
- LOG(ERROR) << msg;
- throw std::runtime_error(msg);
+ for (int j = 0; j < num_cells; j++) {
+ if (!cell_scanner->Advance()) {
+ std::string msg = "Results sent from server=" + std::to_string(num_results) +
+ ". But only got " + std::to_string(i) +
+ " results completely at client. Resetting the scanner to scan again.";
+ LOG(ERROR) << msg;
+ throw std::runtime_error(msg);
+ }
+ vcells.push_back(cell_scanner->Current());
}
// TODO: handle partial results per Result by checking partial_flag_per_result
results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false);
} else {
- results[i] = ToResult(scan_resp->results(i), resp.cell_scanner());
+ results[i] = ToResult(scan_resp->results(i), cell_scanner);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/response-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 0fdde89..b518d1c 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -39,7 +39,7 @@ class ResponseConverter {
~ResponseConverter();
static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result,
- const std::unique_ptr<CellScanner>& cell_scanner);
+ const std::shared_ptr<CellScanner> cell_scanner);
/**
* @brief Returns a Result object created by PB Message in passed Response object.
@@ -51,6 +51,9 @@ class ResponseConverter {
static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp);
+ static std::vector<std::shared_ptr<Result>> FromScanResponse(
+ const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner);
+
static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
const Response& resp);
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result-scanner.h b/hbase-native-client/core/result-scanner.h
new file mode 100644
index 0000000..9460521
--- /dev/null
+++ b/hbase-native-client/core/result-scanner.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <functional>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
+
+namespace hbase {
+
+/**
+ * Interface for client-side scanning. Use Table to obtain instances.
+ */
+class ResultScanner {
+ // TODO: should we implement forward iterators?
+
+ public:
+ virtual ~ResultScanner() {}
+
+ virtual void Close() = 0;
+
+ virtual std::shared_ptr<Result> Next() = 0;
+};
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result-test.cc b/hbase-native-client/core/result-test.cc
index 520f4b9..3299ffc 100644
--- a/hbase-native-client/core/result-test.cc
+++ b/hbase-native-client/core/result-test.cc
@@ -17,6 +17,7 @@
*
*/
+#include <glog/logging.h>
#include <gtest/gtest.h>
#include <limits>
#include <memory>
@@ -71,7 +72,7 @@ void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) {
}
default: {
cells.push_back(std::make_shared<Cell>(
- row, family, column, std::numeric_limits<long>::max(), value, CellType::PUT));
+ row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT));
}
}
}
@@ -255,7 +256,7 @@ TEST(Result, FilledResult) {
break;
}
default: {
- EXPECT_EQ(std::numeric_limits<long>::max(), version_map.first);
+ EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first);
EXPECT_EQ(value, version_map.second);
}
}
@@ -297,3 +298,24 @@ TEST(Result, FilledResult) {
EXPECT_EQ("value-9", qual_val_map.second);
}
}
+
+TEST(Result, ResultEstimatedSize) {
+ CellType cell_type = CellType::PUT;
+ int64_t timestamp = std::numeric_limits<int64_t>::max();
+ std::vector<std::shared_ptr<Cell> > cells;
+ Result empty(cells, true, false, false);
+
+ EXPECT_EQ(empty.EstimatedSize(), sizeof(Result));
+
+ cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+ Result result1(cells, true, false, false);
+ EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize());
+
+ cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type));
+ Result result2(cells, true, false, false);
+ EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize());
+
+ LOG(INFO) << empty.EstimatedSize();
+ LOG(INFO) << result1.EstimatedSize();
+ LOG(INFO) << result2.EstimatedSize();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc
index 9d9ddb3..44b4c86 100644
--- a/hbase-native-client/core/result.cc
+++ b/hbase-native-client/core/result.cc
@@ -25,13 +25,7 @@ Result::~Result() {}
Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale,
bool partial)
- : exists_(exists), stale_(stale), partial_(partial) {
- for (const auto &cell : cells) {
- cells_.push_back(cell);
- // We create the map when cells are added. unlike java where map is created
- // when result.getMap() is called
- result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
- }
+ : exists_(exists), stale_(stale), partial_(partial), cells_(cells) {
row_ = (cells_.size() == 0 ? "" : cells_[0]->Row());
}
@@ -43,10 +37,10 @@ Result::Result(const Result &result) {
if (!result.cells_.empty()) {
for (const auto &cell : result.cells_) {
cells_.push_back(cell);
- result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
}
}
}
+
const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; }
std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family,
@@ -74,13 +68,12 @@ const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family,
return nullptr;
}
-std::shared_ptr<std::string> Result::Value(const std::string &family,
- const std::string &qualifier) const {
+optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const {
std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier));
if (latest_cell.get()) {
- return std::make_shared<std::string>(latest_cell->Value());
+ return optional<std::string>(latest_cell->Value());
}
- return nullptr;
+ return optional<std::string>();
}
bool Result::IsEmpty() const { return cells_.empty(); }
@@ -89,24 +82,33 @@ const std::string &Result::Row() const { return row_; }
int Result::Size() const { return cells_.size(); }
-const ResultMap &Result::Map() const { return result_map_; }
+ResultMap Result::Map() const {
+ ResultMap result_map;
+ for (const auto &cell : cells_) {
+ result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value();
+ }
+ return result_map;
+}
-const std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
+std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const {
std::map<std::string, std::string> family_map;
if (!IsEmpty()) {
- for (auto itr = result_map_.begin(); itr != result_map_.end(); ++itr) {
- if (family == itr->first) {
- for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
- for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
- // We break after inserting the first value. Result.java takes only
- // the first value
- family_map[qitr->first] = vitr->second;
- break;
- }
- }
+ auto result_map = Map();
+ auto itr = result_map.find(family);
+ if (itr == result_map.end()) {
+ return family_map;
+ }
+
+ for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
+ for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) {
+ // We break after inserting the first value. Result.java takes only
+ // the first value
+ family_map[qitr->first] = vitr->second;
+ break;
}
}
}
+
return family_map;
}
@@ -131,4 +133,14 @@ std::string Result::DebugString() const {
return ret;
}
+size_t Result::EstimatedSize() const {
+ size_t s = sizeof(Result);
+ s += row_.capacity();
+ for (const auto c : cells_) {
+ s += sizeof(std::shared_ptr<Cell>);
+ s + c->EstimatedSize();
+ }
+ return s;
+}
+
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h
index 627d161..f18071b 100644
--- a/hbase-native-client/core/result.h
+++ b/hbase-native-client/core/result.h
@@ -26,6 +26,7 @@
#include <vector>
#include "core/cell.h"
+#include "utils/optional.h"
namespace hbase {
@@ -79,7 +80,7 @@ class Result {
* @param family - column family
* @param qualifier - column qualifier
*/
- std::shared_ptr<std::string> Value(const std::string &family, const std::string &qualifier) const;
+ optional<std::string> Value(const std::string &family, const std::string &qualifier) const;
/**
* @brief Returns if the underlying Cell vector is empty or not
@@ -104,23 +105,32 @@ class Result {
* All other map returning methods make use of this map internally
* The Map is created when the Result instance is created
*/
- const ResultMap &Map() const;
+ ResultMap Map() const;
/**
* @brief Map of qualifiers to values.
* Returns a Map of the form: Map<qualifier,value>
* @param family - column family to get
*/
- const std::map<std::string, std::string> FamilyMap(const std::string &family) const;
+ std::map<std::string, std::string> FamilyMap(const std::string &family) const;
std::string DebugString() const;
+ bool Exists() const { return exists_; }
+
+ bool Stale() const { return stale_; }
+
+ bool Partial() const { return partial_; }
+
+ /** Returns estimated size of the Result object including deep heap space usage
+ * of its Cells and data. Notice that this is a very rough estimate. */
+ size_t EstimatedSize() const;
+
private:
bool exists_ = false;
bool stale_ = false;
bool partial_ = false;
std::string row_ = "";
std::vector<std::shared_ptr<Cell> > cells_;
- ResultMap result_map_;
};
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-result-cache-test.cc b/hbase-native-client/core/scan-result-cache-test.cc
new file mode 100644
index 0000000..0bf83ce
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache-test.cc
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "core/cell.h"
+#include "core/result.h"
+#include "core/scan-result-cache.h"
+
+using hbase::ScanResultCache;
+using hbase::Result;
+using hbase::Cell;
+using hbase::CellType;
+
+using ResultVector = std::vector<std::shared_ptr<Result>>;
+
+std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family,
+ const std::string &column) {
+ auto row = folly::to<std::string>(key);
+ return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row,
+ CellType::PUT);
+}
+
+std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) {
+ return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial);
+}
+
+TEST(ScanResultCacheTest, NoPartial) {
+ ScanResultCache cache;
+ ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false));
+ ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true));
+ int32_t count = 10;
+ ResultVector results{};
+ for (int32_t i = 0; i < count; i++) {
+ results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false));
+ }
+ ASSERT_EQ(results, cache.AddAndGet(results, false));
+}
+
+TEST(ScanResultCacheTest, Combine1) {
+ ScanResultCache cache;
+ auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true);
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+ auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+ auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false);
+ ASSERT_EQ(1L, results.size());
+ ASSERT_EQ(prev_result, results[0]);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size());
+
+ results = cache.AddAndGet(ResultVector{}, false);
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(3, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+}
+
+TEST(ScanResultCacheTest, Combine2) {
+ ScanResultCache cache;
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+ auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+
+ auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+ auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+
+ auto results = cache.AddAndGet(ResultVector{next_result1}, false);
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(3, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+
+ results = cache.AddAndGet(ResultVector{next_to_next_result1}, false);
+ ASSERT_EQ(2, results.size());
+ ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(1, results[0]->Cells().size());
+ ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row()));
+ ASSERT_EQ(1, results[1]->Cells().size());
+ ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, Combine3) {
+ ScanResultCache cache;
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+ auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false);
+ auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+
+ auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false);
+
+ ASSERT_EQ(2, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(2, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+ ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row()));
+ ASSERT_EQ(1, results[1]->Cells().size());
+ ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1")));
+
+ results = cache.AddAndGet(ResultVector{}, false);
+
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(1, results[0]->Cells().size());
+ ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+}
+
+TEST(ScanResultCacheTest, Combine4) {
+ ScanResultCache cache;
+ auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+ auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false);
+ auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+ auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false);
+
+ ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+
+ auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false);
+
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(2, results[0]->Cells().size());
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+
+ results = cache.AddAndGet(ResultVector{next_result2}, false);
+
+ ASSERT_EQ(1, results.size());
+ ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+ ASSERT_EQ(2, results[0]->Cells().size());
+ ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+ ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, SizeOf) {
+ std::string e{""};
+ std::string f{"f"};
+ std::string foo{"foo"};
+
+ LOG(INFO) << sizeof(e) << " " << e.capacity();
+ LOG(INFO) << sizeof(f) << " " << f.capacity();
+ LOG(INFO) << sizeof(foo) << " " << foo.capacity();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-result-cache.cc b/hbase-native-client/core/scan-result-cache.cc
new file mode 100644
index 0000000..62a51e0
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/scan-result-cache.h"
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+/**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet(
+ const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) {
+ // If no results were returned it indicates that either we have the all the partial results
+ // necessary to construct the complete result or the server had to send a heartbeat message
+ // to the client to keep the client-server connection alive
+ if (results.empty()) {
+ // If this response was an empty heartbeat message, then we have not exhausted the region
+ // and thus there may be more partials server side that still need to be added to the partial
+ // list before we form the complete Result
+ if (!partial_results_.empty() && !is_hearthbeat) {
+ return UpdateNumberOfCompleteResultsAndReturn(
+ std::vector<std::shared_ptr<Result>>{Combine()});
+ }
+ return std::vector<std::shared_ptr<Result>>{};
+ }
+ // In every RPC response there should be at most a single partial result. Furthermore, if
+ // there is a partial result, it is guaranteed to be in the last position of the array.
+ auto last = results[results.size() - 1];
+ if (last->Partial()) {
+ if (partial_results_.empty()) {
+ partial_results_.push_back(last);
+ std::vector<std::shared_ptr<Result>> new_results;
+ std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results));
+ return UpdateNumberOfCompleteResultsAndReturn(new_results);
+ }
+ // We have only one result and it is partial
+ if (results.size() == 1) {
+ // check if there is a row change
+ if (partial_results_.at(0)->Row() == last->Row()) {
+ partial_results_.push_back(last);
+ return std::vector<std::shared_ptr<Result>>{};
+ }
+ auto complete_result = Combine();
+ partial_results_.push_back(last);
+ return UpdateNumberOfCompleteResultsAndReturn(complete_result);
+ }
+ // We have some complete results
+ auto results_to_return = PrependCombined(results, results.size() - 1);
+ partial_results_.push_back(last);
+ return UpdateNumberOfCompleteResultsAndReturn(results_to_return);
+ }
+ if (!partial_results_.empty()) {
+ return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size()));
+ }
+ return UpdateNumberOfCompleteResultsAndReturn(results);
+}
+
+void ScanResultCache::Clear() { partial_results_.clear(); }
+
+std::shared_ptr<Result> ScanResultCache::CreateCompleteResult(
+ const std::vector<std::shared_ptr<Result>> &partial_results) {
+ if (partial_results.empty()) {
+ return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false);
+ }
+ std::vector<std::shared_ptr<Cell>> cells{};
+ bool stale = false;
+ std::string prev_row = "";
+ std::string current_row = "";
+ size_t i = 0;
+ for (const auto &r : partial_results) {
+ current_row = r->Row();
+ if (i != 0 && prev_row != current_row) {
+ throw new std::runtime_error(
+ "Cannot form complete result. Rows of partial results do not match.");
+ }
+ // Ensure that all Results except the last one are marked as partials. The last result
+ // may not be marked as a partial because Results are only marked as partials when
+ // the scan on the server side must be stopped due to reaching the maxResultSize.
+ // Visualizing it makes it easier to understand:
+ // maxResultSize: 2 cells
+ // (-x-) represents cell number x in a row
+ // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+ // How row1 will be returned by the server as partial Results:
+ // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+ // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+ // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+ if (i != partial_results.size() - 1 && !r->Partial()) {
+ throw new std::runtime_error("Cannot form complete result. Result is missing partial flag.");
+ }
+ prev_row = current_row;
+ stale = stale || r->Stale();
+ for (const auto &c : r->Cells()) {
+ cells.push_back(c);
+ }
+ i++;
+ }
+
+ return std::make_shared<Result>(cells, false, stale, false);
+}
+
+std::shared_ptr<Result> ScanResultCache::Combine() {
+ auto result = CreateCompleteResult(partial_results_);
+ partial_results_.clear();
+ return result;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined(
+ const std::vector<std::shared_ptr<Result>> &results, int length) {
+ if (length == 0) {
+ return std::vector<std::shared_ptr<Result>>{Combine()};
+ }
+ // the last part of a partial result may not be marked as partial so here we need to check if
+ // there is a row change.
+ size_t start;
+ if (partial_results_[0]->Row() == results[0]->Row()) {
+ partial_results_.push_back(results[0]);
+ start = 1;
+ length--;
+ } else {
+ start = 0;
+ }
+ std::vector<std::shared_ptr<Result>> prepend_results{};
+ prepend_results.push_back(Combine());
+ std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results));
+ return prepend_results;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+ const std::shared_ptr<Result> &result) {
+ return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result});
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+ const std::vector<std::shared_ptr<Result>> &results) {
+ num_complete_rows_ += results.size();
+ return results;
+}
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-result-cache.h b/hbase-native-client/core/scan-result-cache.h
new file mode 100644
index 0000000..5d3d0ab
--- /dev/null
+++ b/hbase-native-client/core/scan-result-cache.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/Logging.h>
+#include <algorithm>
+#include <chrono>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+#include "if/HBase.pb.h"
+
+namespace hbase {
+
+class ScanResultCache {
+ // In Java, there are 3 different implementations for this. We are not doing partial results,
+ // or scan batching in native code for now, so this version is simpler and
+ // only deals with giving back complete rows as Result. It is more or less implementation
+ // of CompleteScanResultCache.java
+
+ public:
+ /**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+ std::vector<std::shared_ptr<Result>> AddAndGet(
+ const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat);
+
+ void Clear();
+
+ int64_t num_complete_rows() const { return num_complete_rows_; }
+
+ private:
+ /**
+ * Forms a single result from the partial results in the partialResults list. This method is
+ * useful for reconstructing partial results on the client side.
+ * @param partial_results list of partial results
+ * @return The complete result that is formed by combining all of the partial results together
+ */
+ static std::shared_ptr<Result> CreateCompleteResult(
+ const std::vector<std::shared_ptr<Result>> &partial_results);
+
+ std::shared_ptr<Result> Combine();
+
+ std::vector<std::shared_ptr<Result>> PrependCombined(
+ const std::vector<std::shared_ptr<Result>> &results, int length);
+
+ std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+ const std::shared_ptr<Result> &result);
+
+ std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn(
+ const std::vector<std::shared_ptr<Result>> &results);
+
+ private:
+ std::vector<std::shared_ptr<Result>> partial_results_;
+ int64_t num_complete_rows_ = 0;
+};
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc
index 73fb6df..0ee054c 100644
--- a/hbase-native-client/core/scan-test.cc
+++ b/hbase-native-client/core/scan-test.cc
@@ -17,13 +17,13 @@
*
*/
-#include "core/scan.h"
-
+#include <gtest/gtest.h>
#include <limits>
-#include <gtest/gtest.h>
+#include "core/scan.h"
-using namespace hbase;
+using hbase::Get;
+using hbase::Scan;
void CheckFamilies(Scan &scan) {
EXPECT_EQ(false, scan.HasFamilies());
@@ -74,7 +74,7 @@ void CheckFamilies(Scan &scan) {
EXPECT_EQ(it, scan.FamilyMap().end());
}
-void CheckFamiliesAfterCopy(Scan &scan) {
+void CheckFamiliesAfterCopy(const Scan &scan) {
EXPECT_EQ(true, scan.HasFamilies());
EXPECT_EQ(3, scan.FamilyMap().size());
int i = 1;
@@ -116,11 +116,6 @@ void ScanMethods(Scan &scan) {
scan.SetStopRow(stop_row);
EXPECT_EQ(stop_row, scan.StopRow());
- scan.SetSmall(true);
- EXPECT_EQ(true, scan.IsSmall());
- scan.SetSmall(false);
- EXPECT_EQ(false, scan.IsSmall());
-
scan.SetCaching(3);
EXPECT_EQ(3, scan.Caching());
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc
index 5f315ec..6dcc51b 100644
--- a/hbase-native-client/core/scan.cc
+++ b/hbase-native-client/core/scan.cc
@@ -38,7 +38,7 @@ Scan::Scan(const std::string &start_row, const std::string &stop_row)
CheckRow(stop_row_);
}
-Scan::Scan(const Scan &scan) {
+Scan::Scan(const Scan &scan) : Query(scan) {
start_row_ = scan.start_row_;
stop_row_ = scan.stop_row_;
max_versions_ = scan.max_versions_;
@@ -47,7 +47,6 @@ Scan::Scan(const Scan &scan) {
cache_blocks_ = scan.cache_blocks_;
load_column_families_on_demand_ = scan.load_column_families_on_demand_;
reversed_ = scan.reversed_;
- small_ = scan.small_;
allow_partial_results_ = scan.allow_partial_results_;
consistency_ = scan.consistency_;
tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -55,6 +54,7 @@ Scan::Scan(const Scan &scan) {
}
Scan &Scan::operator=(const Scan &scan) {
+ Query::operator=(scan);
start_row_ = scan.start_row_;
stop_row_ = scan.stop_row_;
max_versions_ = scan.max_versions_;
@@ -63,7 +63,6 @@ Scan &Scan::operator=(const Scan &scan) {
cache_blocks_ = scan.cache_blocks_;
load_column_families_on_demand_ = scan.load_column_families_on_demand_;
reversed_ = scan.reversed_;
- small_ = scan.small_;
allow_partial_results_ = scan.allow_partial_results_;
consistency_ = scan.consistency_;
tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
@@ -109,24 +108,14 @@ void Scan::SetReversed(bool reversed) { reversed_ = reversed; }
bool Scan::IsReversed() const { return reversed_; }
-void Scan::SetStartRow(const std::string &start_row) {
- CheckRow(start_row);
- start_row_ = start_row;
-}
+void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; }
const std::string &Scan::StartRow() const { return start_row_; }
-void Scan::SetStopRow(const std::string &stop_row) {
- CheckRow(stop_row);
- stop_row_ = stop_row;
-}
+void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; }
const std::string &Scan::StopRow() const { return stop_row_; }
-void Scan::SetSmall(bool small) { small_ = small; }
-
-bool Scan::IsSmall() const { return small_; }
-
void Scan::SetCaching(int caching) { caching_ = caching; }
int Scan::Caching() const { return caching_; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h
index fb302b7..1085c4b 100644
--- a/hbase-native-client/core/scan.h
+++ b/hbase-native-client/core/scan.h
@@ -119,16 +119,6 @@ class Scan : public Query {
const std::string &StopRow() const;
/**
- * @brief Set whether this scan is a small scan.
- */
- void SetSmall(bool small);
-
- /**
- * @brief Returns if the scan is a small scan.
- */
- bool IsSmall() const;
-
- /**
* @brief Set the number of rows for caching that will be passed to scanners.
* Higher caching values will enable faster scanners but will use more memory.
* @param caching - the number of rows for caching.
@@ -258,12 +248,11 @@ class Scan : public Query {
std::string start_row_ = "";
std::string stop_row_ = "";
uint32_t max_versions_ = 1;
- int caching_ = -1;
+ int32_t caching_ = -1;
int64_t max_result_size_ = -1;
bool cache_blocks_ = true;
bool load_column_families_on_demand_ = false;
bool reversed_ = false;
- bool small_ = false;
bool allow_partial_results_ = false;
hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG;
std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scanner-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/scanner-test.cc b/hbase-native-client/core/scanner-test.cc
new file mode 100644
index 0000000..1ecbd02
--- /dev/null
+++ b/hbase-native-client/core/scanner-test.cc
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "core/async-client-scanner.h"
+#include "core/async-table-result-scanner.h"
+#include "core/cell.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/filter.h"
+#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/put.h"
+#include "core/result.h"
+#include "core/row.h"
+#include "core/table.h"
+#include "if/Comparator.pb.h"
+#include "if/Filter.pb.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+#include "utils/time-util.h"
+
+using hbase::Cell;
+using hbase::ComparatorFactory;
+using hbase::Comparator;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::Put;
+using hbase::Result;
+using hbase::Scan;
+using hbase::Table;
+using hbase::TestUtil;
+using hbase::TimeUtil;
+using hbase::AsyncClientScanner;
+using hbase::AsyncTableResultScanner;
+using hbase::FilterFactory;
+using hbase::pb::CompareType;
+
+class ScannerTest : public ::testing::Test {
+ public:
+ static std::unique_ptr<hbase::TestUtil> test_util;
+ static const uint32_t num_rows;
+
+ static void SetUpTestCase() {
+ google::InstallFailureSignalHandler();
+ test_util = std::make_unique<hbase::TestUtil>();
+ test_util->StartMiniCluster(2);
+ }
+};
+std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr;
+const uint32_t ScannerTest::num_rows = 1000;
+
+std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); }
+
+std::string Row(uint32_t i, int width) {
+ std::ostringstream s;
+ s.fill('0');
+ s.width(width);
+ s << i;
+ return "row" + s.str();
+}
+
+std::string Row(uint32_t i) { return Row(i, 3); }
+
+std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) {
+ auto put = std::make_unique<Put>(row);
+
+ for (uint32_t i = 0; i < num_families; i++) {
+ put->AddColumn(Family(i), "q1", row);
+ put->AddColumn(Family(i), "q2", row + "-" + row);
+ }
+
+ return std::move(put);
+}
+
+void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) {
+ VLOG(1) << r.DebugString();
+ auto row = r.Row();
+ ASSERT_EQ(row, expected_row);
+ ASSERT_EQ(r.Cells().size(), num_families * 2);
+ for (uint32_t i = 0; i < num_families; i++) {
+ ASSERT_EQ(*r.Value(Family(i), "q1"), row);
+ ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row);
+ }
+}
+
+void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows,
+ int32_t num_regions) {
+ LOG(INFO) << "Creating the table " << table_name
+ << " with num_regions:" << folly::to<std::string>(num_regions);
+ std::vector<std::string> families;
+ for (uint32_t i = 0; i < num_families; i++) {
+ families.push_back(Family(i));
+ }
+ if (num_regions <= 1) {
+ ScannerTest::test_util->CreateTable(table_name, families);
+ } else {
+ std::vector<std::string> keys;
+ for (int32_t i = 0; i < num_regions - 1; i++) {
+ keys.push_back(Row(i * (num_rows / (num_regions - 1))));
+ LOG(INFO) << "Split key:" << keys[keys.size() - 1];
+ }
+ ScannerTest::test_util->CreateTable(table_name, families, keys);
+ }
+}
+
+std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name,
+ uint32_t num_families, uint32_t num_rows,
+ int32_t num_regions) {
+ CreateTable(table_name, num_families, num_rows, num_regions);
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+ auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+ auto table = client->Table(tn);
+
+ LOG(INFO) << "Writing data to the table, num_rows:" << num_rows;
+ // Perform Puts
+ for (uint32_t i = 0; i < num_rows; i++) {
+ table->Put(*MakePut(Row(i), num_families));
+ }
+ return std::move(client);
+}
+
+void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows,
+ Table *table) {
+ LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow()
+ << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows;
+ auto scanner = table->Scan(scan);
+
+ uint32_t i = start;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ CheckResult(*r, Row(i++), num_families);
+ r = scanner->Next();
+ }
+ ASSERT_EQ(i - start, num_rows);
+}
+
+void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) {
+ TestScan(scan, 1, start, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+ Scan scan{};
+ if (start >= 0) {
+ scan.SetStartRow(Row(start));
+ } else {
+ start = 0; // neded for below logic
+ }
+ if (stop >= 0) {
+ scan.SetStopRow(Row(stop));
+ }
+
+ TestScan(scan, num_families, start, num_rows, table);
+}
+
+void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+ TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows,
+ Table *table) {
+ Scan scan{};
+
+ scan.SetStartRow(start);
+ scan.SetStopRow(stop);
+
+ LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop
+ << " expected_num_rows:" << num_rows;
+ auto scanner = table->Scan(scan);
+
+ uint32_t i = 0;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ VLOG(1) << r->DebugString();
+ i++;
+ ASSERT_EQ(r->Map().size(), num_families);
+ r = scanner->Next();
+ }
+ ASSERT_EQ(i, num_rows);
+}
+
+void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) {
+ TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScanCombinations(Table *table, uint32_t num_families) {
+ // full table
+ TestScan(num_families, -1, -1, 1000, table);
+ TestScan(num_families, -1, 999, 999, table);
+ TestScan(num_families, 0, -1, 1000, table);
+ TestScan(num_families, 0, 999, 999, table);
+ TestScan(num_families, 10, 990, 980, table);
+ TestScan(num_families, 1, 998, 997, table);
+
+ TestScan(num_families, 123, 345, 222, table);
+ TestScan(num_families, 234, 456, 222, table);
+ TestScan(num_families, 345, 567, 222, table);
+ TestScan(num_families, 456, 678, 222, table);
+
+ // single results
+ TestScan(num_families, 111, 111, 1, table); // split keys are like 111, 222, 333, etc
+ TestScan(num_families, 111, 112, 1, table);
+ TestScan(num_families, 332, 332, 1, table);
+ TestScan(num_families, 332, 333, 1, table);
+ TestScan(num_families, 333, 333, 1, table);
+ TestScan(num_families, 333, 334, 1, table);
+ TestScan(num_families, 42, 42, 1, table);
+ TestScan(num_families, 921, 921, 1, table);
+ TestScan(num_families, 0, 0, 1, table);
+ TestScan(num_families, 0, 1, 1, table);
+ TestScan(num_families, 999, 999, 1, table);
+
+ // few results
+ TestScan(num_families, 0, 0, 1, table);
+ TestScan(num_families, 0, 2, 2, table);
+ TestScan(num_families, 0, 5, 5, table);
+ TestScan(num_families, 10, 15, 5, table);
+ TestScan(num_families, 105, 115, 10, table);
+ TestScan(num_families, 111, 221, 110, table);
+ TestScan(num_families, 111, 222, 111, table); // crossing region boundary 111-222
+ TestScan(num_families, 111, 223, 112, table);
+ TestScan(num_families, 111, 224, 113, table);
+ TestScan(num_families, 990, 999, 9, table);
+ TestScan(num_families, 900, 998, 98, table);
+
+ // empty results
+ TestScan(num_families, "a", "a", 0, table);
+ TestScan(num_families, "a", "r", 0, table);
+ TestScan(num_families, "", "r", 0, table);
+ TestScan(num_families, "s", "", 0, table);
+ TestScan(num_families, "s", "z", 0, table);
+ TestScan(num_families, Row(110) + "a", Row(111), 0, table);
+ TestScan(num_families, Row(111) + "a", Row(112), 0, table);
+ TestScan(num_families, Row(123) + "a", Row(124), 0, table);
+
+ // custom
+ TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+ TestScan(num_families, Row(0, 3), Row(0, 4), 1, table);
+ TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table);
+ TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+ TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table);
+ TestScan(num_families, "a", "z", 1000, table);
+}
+
+// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and
+// TestScannersFromClientSide*
+
+TEST_F(ScannerTest, SingleRegionScan) {
+ auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan"));
+
+ TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, MultiRegionScan) {
+ auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+ TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, ScanWithPauses) {
+ auto max_result_size =
+ ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152);
+ ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100);
+ auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+ VLOG(1) << "Starting scan for the test";
+ Scan scan{};
+ scan.SetCaching(100);
+ auto scanner = table->Scan(scan);
+
+ uint32_t i = 0;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ CheckResult(*r, Row(i++), 1);
+ r = scanner->Next();
+ std::this_thread::sleep_for(TimeUtil::MillisToNanos(10));
+ }
+
+ auto s = static_cast<AsyncTableResultScanner *>(scanner.get());
+ ASSERT_GT(s->num_prefetch_stopped(), 0);
+
+ ASSERT_EQ(i, num_rows);
+ ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size);
+}
+
+TEST_F(ScannerTest, ScanWithFilters) {
+ auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters"));
+
+ Scan scan{};
+ scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL,
+ *ComparatorFactory::BinaryComparator(Row(800))));
+
+ TestScan(scan, 800, 200, table.get());
+}
+
+TEST_F(ScannerTest, ScanMultiFamily) {
+ auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1);
+ auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family"));
+
+ TestScanCombinations(table.get(), 3);
+}
+
+TEST_F(ScannerTest, ScanNullQualifier) {
+ std::string table_name{"t_scan_null_qualifier"};
+ std::string row{"row"};
+ CreateTable(table_name, 1, 1, 1);
+
+ auto tn = folly::to<hbase::pb::TableName>(table_name);
+ auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+ auto table = client->Table(tn);
+
+ // Perform Puts
+ Put put{row};
+ put.AddColumn(Family(0), "q1", row);
+ put.AddColumn(Family(0), "", row);
+ table->Put(put);
+
+ Scan scan1{};
+ scan1.AddColumn(Family(0), "");
+ auto scanner1 = table->Scan(scan1);
+ auto r1 = scanner1->Next();
+ ASSERT_EQ(r1->Cells().size(), 1);
+ ASSERT_EQ(scanner1->Next(), nullptr);
+
+ Scan scan2{};
+ scan2.AddFamily(Family(0));
+ auto scanner2 = table->Scan(scan2);
+ auto r2 = scanner2->Next();
+ ASSERT_EQ(r2->Cells().size(), 2);
+ ASSERT_EQ(scanner2->Next(), nullptr);
+}
+
+TEST_F(ScannerTest, ScanNoResults) {
+ std::string table_name{"t_scan_no_results"};
+ auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3);
+ auto table = client->Table(folly::to<hbase::pb::TableName>(table_name));
+
+ Scan scan{};
+ scan.AddColumn(Family(0), "non_existing_qualifier");
+
+ TestScan(scan, 0, 0, table.get());
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 3a7d62b..f79d848 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -30,6 +30,7 @@
#include "core/client.h"
#include "core/get.h"
#include "core/put.h"
+#include "core/scan.h"
#include "core/table.h"
#include "serde/server-name.h"
#include "serde/table-name.h"
@@ -38,6 +39,7 @@
using hbase::Client;
using hbase::Configuration;
using hbase::Get;
+using hbase::Scan;
using hbase::Put;
using hbase::Table;
using hbase::pb::TableName;
@@ -48,6 +50,10 @@ DEFINE_string(table, "test_table", "What table to do the reads or writes");
DEFINE_string(row, "row_", "row prefix");
DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
+DEFINE_bool(puts, true, "Whether to perform puts");
+DEFINE_bool(gets, true, "Whether to perform gets");
+DEFINE_bool(multigets, true, "Whether to perform multi-gets");
+DEFINE_bool(scans, true, "Whether to perform scans");
DEFINE_bool(display_results, false, "Whether to display the Results from Gets");
DEFINE_int32(threads, 6, "How many cpu threads");
@@ -86,41 +92,72 @@ int main(int argc, char *argv[]) {
auto start_ns = TimeUtil::GetNowNanos();
// Do the Put requests
- for (uint64_t i = 0; i < num_puts; i++) {
- table->Put(*MakePut(Row(FLAGS_row, i)));
- }
+ if (FLAGS_puts) {
+ LOG(INFO) << "Sending put requests";
+ for (uint64_t i = 0; i < num_puts; i++) {
+ table->Put(*MakePut(Row(FLAGS_row, i)));
+ }
- LOG(INFO) << "Successfully sent " << num_puts << " Put requests in "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ LOG(INFO) << "Successfully sent " << num_puts << " Put requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ }
// Do the Get requests
- start_ns = TimeUtil::GetNowNanos();
- for (uint64_t i = 0; i < num_puts; i++) {
- auto result = table->Get(Get{Row(FLAGS_row, i)});
- if (FLAGS_display_results) {
- LOG(INFO) << result->DebugString();
+ if (FLAGS_gets) {
+ LOG(INFO) << "Sending get requests";
+ start_ns = TimeUtil::GetNowNanos();
+ for (uint64_t i = 0; i < num_puts; i++) {
+ auto result = table->Get(Get{Row(FLAGS_row, i)});
+ if (FLAGS_display_results) {
+ LOG(INFO) << result->DebugString();
+ }
}
- }
- LOG(INFO) << "Successfully sent " << num_puts << " Get requests in "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ LOG(INFO) << "Successfully sent " << num_puts << " Get requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ }
// Do the Multi-Gets
- std::vector<hbase::Get> gets;
- for (uint64_t i = 0; i < num_puts; ++i) {
- hbase::Get get(Row(FLAGS_row, i));
- gets.push_back(get);
- }
+ if (FLAGS_multigets) {
+ std::vector<hbase::Get> gets;
+ for (uint64_t i = 0; i < num_puts; ++i) {
+ hbase::Get get(Row(FLAGS_row, i));
+ gets.push_back(get);
+ }
+
+ LOG(INFO) << "Sending multi-get requests";
+ start_ns = TimeUtil::GetNowNanos();
+ auto results = table->Get(gets);
- start_ns = TimeUtil::GetNowNanos();
- auto results = table->Get(gets);
+ if (FLAGS_display_results) {
+ for (const auto &result : results) LOG(INFO) << result->DebugString();
+ }
- if (FLAGS_display_results) {
- for (const auto &result : results) LOG(INFO) << result->DebugString();
+ LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
- LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ // Do the Scan
+ if (FLAGS_scans) {
+ LOG(INFO) << "Starting scanner";
+ start_ns = TimeUtil::GetNowNanos();
+ Scan scan{};
+ auto scanner = table->Scan(scan);
+
+ uint64_t i = 0;
+ auto r = scanner->Next();
+ while (r != nullptr) {
+ if (FLAGS_display_results) {
+ LOG(INFO) << r->DebugString();
+ }
+ r = scanner->Next();
+ i++;
+ }
+
+ LOG(INFO) << "Successfully iterated over " << i << " Scan results in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ scanner->Close();
+ }
table->Close();
client->Close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index aa51989..9cdd8b0 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -25,6 +25,7 @@
#include <vector>
#include "core/async-connection.h"
+#include "core/async-table-result-scanner.h"
#include "core/request-converter.h"
#include "core/response-converter.h"
#include "if/Client.pb.h"
@@ -52,6 +53,21 @@ std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
return context.get(operation_timeout());
}
+std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) {
+ auto max_cache_size = ResultSize2CacheSize(
+ scan.MaxResultSize() > 0 ? scan.MaxResultSize()
+ : async_connection_->connection_conf()->scanner_max_result_size());
+ auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size);
+ async_table_->Scan(scan, scanner);
+ return scanner;
+}
+
+int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const {
+ // * 2 if possible
+ return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size
+ : max_results_size * 2;
+}
+
void Table::Put(const hbase::Put &put) {
auto future = async_table_->Put(put);
future.get(operation_timeout());
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 81ddc8e..1f6d9b7 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -32,6 +32,7 @@
#include "core/location-cache.h"
#include "core/put.h"
#include "core/raw-async-table.h"
+#include "core/result-scanner.h"
#include "core/result.h"
#include "serde/table-name.h"
@@ -74,6 +75,8 @@ class Table {
std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment);
// TODO: Batch Puts
+ std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
+
/**
* @brief - Close the client connection.
*/
@@ -92,5 +95,7 @@ class Table {
private:
std::chrono::milliseconds operation_timeout() const;
+
+ int64_t ResultSize2CacheSize(int64_t max_results_size) const;
};
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/exceptions/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK
index eef4437..07ffeaf 100644
--- a/hbase-native-client/exceptions/BUCK
+++ b/hbase-native-client/exceptions/BUCK
@@ -17,8 +17,12 @@
cxx_library(
name="exceptions",
- exported_headers=["exception.h",],
+ exported_headers=[
+ "exception.h",
+ ],
srcs=[],
- deps=["//third-party:folly",],
+ deps=[
+ "//third-party:folly",
+ ],
compiler_flags=['-Weffc++'],
- visibility=['//core/...','//connection//...'],)
\ No newline at end of file
+ visibility=['//core/...', '//connection//...'],)