You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/08/11 18:09:41 UTC
hbase git commit: HBASE-18537 [C++] Improvements to load-client
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 9786a07ee -> e5643e863
HBASE-18537 [C++] Improvements to load-client
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5643e86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5643e86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5643e86
Branch: refs/heads/HBASE-14850
Commit: e5643e8632f8d4c916f1e1ca9537612b7370ace1
Parents: 9786a07
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Aug 11 11:09:34 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Aug 11 11:09:34 2017 -0700
----------------------------------------------------------------------
hbase-native-client/core/load-client.cc | 213 +++++++++++++++----------
hbase-native-client/core/simple-client.cc | 4 +-
2 files changed, 135 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5643e86/hbase-native-client/core/load-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc
index 67f0d57..8cceeef 100644
--- a/hbase-native-client/core/load-client.cc
+++ b/hbase-native-client/core/load-client.cc
@@ -63,16 +63,17 @@ static constexpr const char *appendPrefix = "a";
std::string PrefixZero(int total_width, int num) {
std::string str = std::to_string(num);
- auto prefix_len = total_width - str.length();
- if (prefix_len > 0) return std::string(total_width - str.length(), '0') + str;
+ int prefix_len = total_width - str.length();
+ if (prefix_len > 0) {
+ return std::string(prefix_len, '0') + str;
+ }
return str;
}
bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) {
auto col = std::to_string(m);
- auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col)));
- if (int_val != m) {
- LOG(ERROR) << "value is not " << col << " for " << result->Row();
+ if (!result->Value(family, col)) {
+ LOG(ERROR) << "Column:" << col << " is not found for " << result->Row();
return false;
}
auto l = *(result->Value(family, col));
@@ -80,11 +81,52 @@ bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) {
LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
return false;
}
- l = *(result->Value(family, appendPrefix + col));
- if (l != col) {
- LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
+ if (FLAGS_appends) {
+ if (!result->Value(family, incrPrefix + col)) {
+ LOG(ERROR) << "Column:" << (incrPrefix + col) << " is not found for " << result->Row();
+ return false;
+ }
+ auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col)));
+ if (int_val != m) {
+ LOG(ERROR) << "value is not " << col << " for " << result->Row();
+ return false;
+ }
+ if (!result->Value(family, appendPrefix + col)) {
+ LOG(ERROR) << "Column:" << (appendPrefix + col) << " is not found for " << result->Row();
+ return false;
+ }
+ l = *(result->Value(family, appendPrefix + col));
+ if (l != col) {
+ LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool Verify(std::shared_ptr<hbase::Result> result, const std::string &row,
+ const std::vector<std::string> &families) {
+ if (result == nullptr || result->IsEmpty()) {
+ LOG(ERROR) << "didn't get result";
return false;
}
+ if (result->Row().compare(row) != 0) {
+ LOG(ERROR) << "row " << result->Row() << " is not the expected: " << row;
+ return false;
+ }
+ // Test the values
+ for (auto family : families) {
+ if (!result->Value(family, kNumColumn)) {
+ LOG(ERROR) << "Column:" << kNumColumn << " is not found for " << result->Row();
+ return false;
+ }
+ auto cols = std::stoi(*(result->Value(family, kNumColumn)));
+ VLOG(3) << "Result for row:" << row << " contains " << std::to_string(cols) << " columns";
+ for (int m = 1; m <= cols; m++) {
+ if (!Verify(result, family, m)) return false;
+ }
+ }
return true;
}
@@ -95,35 +137,46 @@ bool DoScan(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Tabl
auto end = start + rows;
auto width = std::to_string(max_row).length();
scan.SetStartRow(PrefixZero(width, start));
- scan.SetStopRow(PrefixZero(width, end));
+ if (end != max_row && end != max_row + 1) {
+ scan.SetStopRow(PrefixZero(width, end));
+ }
+
+ auto start_ns = TimeUtil::GetNowNanos();
auto scanner = table->Scan(scan);
- auto cnt = start;
+ auto cnt = 0;
auto r = scanner->Next();
while (r != nullptr) {
- auto row = PrefixZero(width, cnt);
- if (r->Row().compare(row) != 0) {
- LOG(ERROR) << "row " << r->Row() << " is not the expected: " << row;
+ auto row = PrefixZero(width, start + cnt);
+ if (!Verify(r, row, families)) {
return false;
}
- for (auto family : families) {
- auto cols = std::stoi(*(r->Value(family, kNumColumn)));
- VLOG(3) << "scan gets " << std::to_string(cols) << " columns";
- for (int m = 1; m <= cols; m++) {
- if (!Verify(r, family, m)) return false;
- }
- }
cnt++;
r = scanner->Next();
+ if (cnt != 0 && cnt % FLAGS_report_num_rows == 0) {
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "Scan iterated over " << cnt << " results in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ }
+ }
+ if (cnt != rows) {
+ LOG(ERROR) << "(Thread " << iteration << ") "
+ << "Expected number of results does not match. expected:" << rows
+ << ", actual:" << cnt;
+ return false;
}
- LOG(INFO) << "Iteration " << iteration << " scanned " << std::to_string(cnt - start) << " rows";
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "scanned " << std::to_string(cnt) << " rows in " << TimeUtil::ElapsedMillis(start_ns)
+ << " ms.";
return true;
}
bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table,
- const std::vector<std::string> &families, int batch_num_rows) {
+ const std::vector<std::string> &families, uint64_t batch_num_rows) {
auto width = std::to_string(max_row).length();
+ auto start_ns = TimeUtil::GetNowNanos();
for (uint64_t k = iteration; k <= max_row;) {
+ uint64_t total_read = 0;
std::vector<hbase::Get> gets;
for (uint64_t i = 0; i < batch_num_rows && k <= max_row; ++i, k += FLAGS_threads) {
std::string row = PrefixZero(width, k);
@@ -132,29 +185,34 @@ bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table
}
VLOG(3) << "getting for " << batch_num_rows << " rows";
auto results = table->Get(gets);
+ if (results.size() != gets.size()) {
+ LOG(ERROR) << "(Thread " << iteration << ") "
+ << "Expected number of results does not match. expected:" << gets.size()
+ << ", actual:" << results.size();
+ return false;
+ }
for (uint64_t i = 0; i < batch_num_rows && i < results.size(); ++i) {
- auto result = results[i];
- if (result == nullptr) {
- LOG(ERROR) << "didn't get result";
+ if (!Verify(results[i], gets[i].row(), families)) {
return false;
}
- // Test the values
- for (auto family : families) {
- auto cols = std::stoi(*(result->Value(family, kNumColumn)));
- VLOG(3) << "gets " << std::to_string(cols) << " columns";
- for (int m = 1; m <= cols; m++) {
- if (!Verify(result, family, m)) return false;
- }
- }
+ }
+ total_read += gets.size();
+ if (total_read != 0 && total_read % FLAGS_report_num_rows == 0) {
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "Sent " << total_read << " Multi-Get requests in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
k += batch_num_rows;
}
- LOG(INFO) << "Sent " << rows << " gets in iteration " << iteration;
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "Sent " << rows << " gets"
+ << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
return true;
}
void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique_ptr<Table> table,
const std::vector<std::string> &families) {
+ auto start_ns = TimeUtil::GetNowNanos();
auto width = std::to_string(max_row).length();
for (uint64_t j = 0; j < rows; j++) {
std::string row = PrefixZero(width, iteration * rows + j);
@@ -167,15 +225,20 @@ void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique
}
}
table->Put(put);
- if ((j + 1) % FLAGS_report_num_rows == 0)
- LOG(INFO) << "Written " << std::to_string(j + 1) << " rows";
+ if ((j + 1) % FLAGS_report_num_rows == 0) {
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "Written " << std::to_string(j + 1) << " rows in "
+ << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ }
}
- LOG(INFO) << "written " << std::to_string(rows) << " rows in " << std::to_string(iteration)
- << " iteration";
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "written " << std::to_string(rows) << " rows"
+ << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols,
std::unique_ptr<Table> table, const std::vector<std::string> &families) {
+ auto start_ns = TimeUtil::GetNowNanos();
auto width = std::to_string(max_row).length();
for (uint64_t j = 0; j < rows; j++) {
std::string row = PrefixZero(width, iteration * rows + j);
@@ -188,20 +251,26 @@ bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols,
hbase::Increment{row}.AddColumn(family, incrPrefix + std::to_string(k), k));
if (!table->Append(hbase::Append{row}.Add(family, appendPrefix + std::to_string(k),
std::to_string(k)))) {
- LOG(ERROR) << "append for " << row << " family: " << family << " failed";
+ LOG(ERROR) << "(Thread " << iteration << ") "
+ << "append for " << row << " family: " << family << " failed";
return false;
}
}
}
if ((j + 1) % FLAGS_report_num_rows == 0)
- LOG(INFO) << "Written " << std::to_string(j + 1) << " increments";
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "Written " << std::to_string(j + 1) << " increments"
+ << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
+ LOG(INFO) << "(Thread " << iteration << ") "
+ << "written " << std::to_string(rows) << " increments"
+ << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
return true;
}
int main(int argc, char *argv[]) {
- google::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line");
- google::ParseCommandLineFlags(&argc, &argv, true);
+ gflags::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line");
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
FLAGS_logtostderr = 1;
@@ -244,94 +313,78 @@ int main(int argc, char *argv[]) {
int rows = FLAGS_num_rows / FLAGS_threads;
if (FLAGS_num_rows % FLAGS_threads != 0) rows++;
int cols = FLAGS_num_cols;
- bool succeeded = true;
+ std::atomic<int8_t> succeeded{1}; // not using bool since we want atomic &=
if (FLAGS_puts) {
+ LOG(INFO) << "Sending put requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> writer_threads;
for (int i = 0; i < FLAGS_threads; i++) {
- writer_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
- // Get connection to HBase Table
+ writer_threads.push_back(std::thread([&, i] {
auto table = client->Table(*tn);
-
DoPut(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families);
- table->Close();
}));
}
- for (std::vector<std::thread>::iterator it = writer_threads.begin(); it != writer_threads.end();
- it++) {
- std::thread thread = std::move(*it);
- thread.join();
+ for (auto &t : writer_threads) {
+ t.join();
}
LOG(INFO) << "Successfully sent " << num_puts << " Put requests in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
if (FLAGS_appends) {
+ LOG(INFO) << "Sending append/increment requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> writer_threads;
for (int i = 0; i < FLAGS_threads; i++) {
- writer_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
- // Get connection to HBase Table
+ writer_threads.push_back(std::thread([&, i] {
auto table = client->Table(*tn);
-
succeeded &=
DoAppendIncrement(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families);
- table->Close();
}));
}
- for (std::vector<std::thread>::iterator it = writer_threads.begin(); it != writer_threads.end();
- it++) {
- std::thread thread = std::move(*it);
- thread.join();
+ for (auto &t : writer_threads) {
+ t.join();
}
LOG(INFO) << "Successfully sent " << num_puts << " append requests in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
if (FLAGS_scans) {
+ LOG(INFO) << "Sending scan requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> reader_threads;
for (int i = 0; i < FLAGS_threads; i++) {
- reader_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
- // Get connection to HBase Table
+ reader_threads.push_back(std::thread([&, i] {
auto table1 = client->Table(*tn);
-
succeeded &= DoScan(i, FLAGS_num_rows - 1, rows, std::move(table1), families);
- table1->Close();
}));
}
- for (std::vector<std::thread>::iterator it = reader_threads.begin(); it != reader_threads.end();
- it++) {
- std::thread thread = std::move(*it);
- thread.join();
+ for (auto &t : reader_threads) {
+ t.join();
}
- LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ LOG(INFO) << (succeeded.load() ? "Successfully " : "Failed. ") << " scannned " << num_puts
+ << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
if (FLAGS_gets) {
+ LOG(INFO) << "Sending get requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> reader_threads;
for (int i = 0; i < FLAGS_threads; i++) {
- reader_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
- // Get connection to HBase Table
+ reader_threads.push_back(std::thread([&, i] {
auto table1 = client->Table(*tn);
-
succeeded &=
DoGet(i, FLAGS_num_rows - 1, rows, std::move(table1), families, FLAGS_batch_num_rows);
- table1->Close();
}));
}
- for (std::vector<std::thread>::iterator it = reader_threads.begin(); it != reader_threads.end();
- it++) {
- std::thread thread = std::move(*it);
- thread.join();
+ for (auto &t : reader_threads) {
+ t.join();
}
- LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent "
- << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+ LOG(INFO) << (succeeded.load() ? "Successful. " : "Failed. ") << " sent multi-get requests for "
+ << num_puts << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
client->Close();
- return succeeded ? 0 : -1;
+ return succeeded.load() ? 0 : -1;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5643e86/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index d36689e..6730248 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -82,8 +82,8 @@ void ValidateResult(const Result &result, const std::string &row) {
}
int main(int argc, char *argv[]) {
- google::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line");
- google::ParseCommandLineFlags(&argc, &argv, true);
+ gflags::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line");
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
FLAGS_logtostderr = 1;