You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/08/11 18:09:41 UTC

hbase git commit: HBASE-18537 [C++] Improvements to load-client

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 9786a07ee -> e5643e863


HBASE-18537 [C++] Improvements to load-client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5643e86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5643e86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5643e86

Branch: refs/heads/HBASE-14850
Commit: e5643e8632f8d4c916f1e1ca9537612b7370ace1
Parents: 9786a07
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Aug 11 11:09:34 2017 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Aug 11 11:09:34 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/core/load-client.cc   | 213 +++++++++++++++----------
 hbase-native-client/core/simple-client.cc |   4 +-
 2 files changed, 135 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e5643e86/hbase-native-client/core/load-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc
index 67f0d57..8cceeef 100644
--- a/hbase-native-client/core/load-client.cc
+++ b/hbase-native-client/core/load-client.cc
@@ -63,16 +63,17 @@ static constexpr const char *appendPrefix = "a";
 
 std::string PrefixZero(int total_width, int num) {
   std::string str = std::to_string(num);
-  auto prefix_len = total_width - str.length();
-  if (prefix_len > 0) return std::string(total_width - str.length(), '0') + str;
+  int prefix_len = total_width - str.length();
+  if (prefix_len > 0) {
+    return std::string(prefix_len, '0') + str;
+  }
   return str;
 }
 
 bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) {
   auto col = std::to_string(m);
-  auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col)));
-  if (int_val != m) {
-    LOG(ERROR) << "value is not " << col << " for " << result->Row();
+  if (!result->Value(family, col)) {
+    LOG(ERROR) << "Column:" << col << " is not found for " << result->Row();
     return false;
   }
   auto l = *(result->Value(family, col));
@@ -80,11 +81,52 @@ bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) {
     LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
     return false;
   }
-  l = *(result->Value(family, appendPrefix + col));
-  if (l != col) {
-    LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
+  if (FLAGS_appends) {
+    if (!result->Value(family, incrPrefix + col)) {
+      LOG(ERROR) << "Column:" << (incrPrefix + col) << " is not found for " << result->Row();
+      return false;
+    }
+    auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col)));
+    if (int_val != m) {
+      LOG(ERROR) << "value is not " << col << " for " << result->Row();
+      return false;
+    }
+    if (!result->Value(family, appendPrefix + col)) {
+      LOG(ERROR) << "Column:" << (appendPrefix + col) << " is not found for " << result->Row();
+      return false;
+    }
+    l = *(result->Value(family, appendPrefix + col));
+    if (l != col) {
+      LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool Verify(std::shared_ptr<hbase::Result> result, const std::string &row,
+            const std::vector<std::string> &families) {
+  if (result == nullptr || result->IsEmpty()) {
+    LOG(ERROR) << "didn't get result";
     return false;
   }
+  if (result->Row().compare(row) != 0) {
+    LOG(ERROR) << "row " << result->Row() << " is not the expected: " << row;
+    return false;
+  }
+  // Test the values
+  for (auto family : families) {
+    if (!result->Value(family, kNumColumn)) {
+      LOG(ERROR) << "Column:" << kNumColumn << " is not found for " << result->Row();
+      return false;
+    }
+    auto cols = std::stoi(*(result->Value(family, kNumColumn)));
+    VLOG(3) << "Result for row:" << row << " contains " << std::to_string(cols) << " columns";
+    for (int m = 1; m <= cols; m++) {
+      if (!Verify(result, family, m)) return false;
+    }
+  }
   return true;
 }
 
@@ -95,35 +137,46 @@ bool DoScan(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Tabl
   auto end = start + rows;
   auto width = std::to_string(max_row).length();
   scan.SetStartRow(PrefixZero(width, start));
-  scan.SetStopRow(PrefixZero(width, end));
+  if (end != max_row && end != max_row + 1) {
+    scan.SetStopRow(PrefixZero(width, end));
+  }
+
+  auto start_ns = TimeUtil::GetNowNanos();
   auto scanner = table->Scan(scan);
 
-  auto cnt = start;
+  auto cnt = 0;
   auto r = scanner->Next();
   while (r != nullptr) {
-    auto row = PrefixZero(width, cnt);
-    if (r->Row().compare(row) != 0) {
-      LOG(ERROR) << "row " << r->Row() << " is not the expected: " << row;
+    auto row = PrefixZero(width, start + cnt);
+    if (!Verify(r, row, families)) {
       return false;
     }
-    for (auto family : families) {
-      auto cols = std::stoi(*(r->Value(family, kNumColumn)));
-      VLOG(3) << "scan gets " << std::to_string(cols) << " columns";
-      for (int m = 1; m <= cols; m++) {
-        if (!Verify(r, family, m)) return false;
-      }
-    }
     cnt++;
     r = scanner->Next();
+    if (cnt != 0 && cnt % FLAGS_report_num_rows == 0) {
+      LOG(INFO) << "(Thread " << iteration << ") "
+                << "Scan iterated over " << cnt << " results in "
+                << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    }
+  }
+  if (cnt != rows) {
+    LOG(ERROR) << "(Thread " << iteration << ") "
+               << "Expected number of results does not match. expected:" << rows
+               << ", actual:" << cnt;
+    return false;
   }
-  LOG(INFO) << "Iteration " << iteration << " scanned " << std::to_string(cnt - start) << " rows";
+  LOG(INFO) << "(Thread " << iteration << ") "
+            << "scanned " << std::to_string(cnt) << " rows in " << TimeUtil::ElapsedMillis(start_ns)
+            << " ms.";
   return true;
 }
 
 bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table,
-           const std::vector<std::string> &families, int batch_num_rows) {
+           const std::vector<std::string> &families, uint64_t batch_num_rows) {
   auto width = std::to_string(max_row).length();
+  auto start_ns = TimeUtil::GetNowNanos();
   for (uint64_t k = iteration; k <= max_row;) {
+    uint64_t total_read = 0;
     std::vector<hbase::Get> gets;
     for (uint64_t i = 0; i < batch_num_rows && k <= max_row; ++i, k += FLAGS_threads) {
       std::string row = PrefixZero(width, k);
@@ -132,29 +185,34 @@ bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table
     }
     VLOG(3) << "getting for " << batch_num_rows << " rows";
     auto results = table->Get(gets);
+    if (results.size() != gets.size()) {
+      LOG(ERROR) << "(Thread " << iteration << ") "
+                 << "Expected number of results does not match. expected:" << gets.size()
+                 << ", actual:" << results.size();
+      return false;
+    }
     for (uint64_t i = 0; i < batch_num_rows && i < results.size(); ++i) {
-      auto result = results[i];
-      if (result == nullptr) {
-        LOG(ERROR) << "didn't get result";
+      if (!Verify(results[i], gets[i].row(), families)) {
         return false;
       }
-      // Test the values
-      for (auto family : families) {
-        auto cols = std::stoi(*(result->Value(family, kNumColumn)));
-        VLOG(3) << "gets " << std::to_string(cols) << " columns";
-        for (int m = 1; m <= cols; m++) {
-          if (!Verify(result, family, m)) return false;
-        }
-      }
+    }
+    total_read += gets.size();
+    if (total_read != 0 && total_read % FLAGS_report_num_rows == 0) {
+      LOG(INFO) << "(Thread " << iteration << ") "
+                << "Sent  " << total_read << " Multi-Get requests in "
+                << TimeUtil::ElapsedMillis(start_ns) << " ms.";
     }
     k += batch_num_rows;
   }
-  LOG(INFO) << "Sent " << rows << " gets in iteration " << iteration;
+  LOG(INFO) << "(Thread " << iteration << ") "
+            << "Sent " << rows << " gets"
+            << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   return true;
 }
 
 void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique_ptr<Table> table,
            const std::vector<std::string> &families) {
+  auto start_ns = TimeUtil::GetNowNanos();
   auto width = std::to_string(max_row).length();
   for (uint64_t j = 0; j < rows; j++) {
     std::string row = PrefixZero(width, iteration * rows + j);
@@ -167,15 +225,20 @@ void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique
       }
     }
     table->Put(put);
-    if ((j + 1) % FLAGS_report_num_rows == 0)
-      LOG(INFO) << "Written " << std::to_string(j + 1) << " rows";
+    if ((j + 1) % FLAGS_report_num_rows == 0) {
+      LOG(INFO) << "(Thread " << iteration << ") "
+                << "Written " << std::to_string(j + 1) << " rows in "
+                << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    }
   }
-  LOG(INFO) << "written " << std::to_string(rows) << " rows in " << std::to_string(iteration)
-            << " iteration";
+  LOG(INFO) << "(Thread " << iteration << ") "
+            << "written " << std::to_string(rows) << " rows"
+            << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
 }
 
 bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols,
                        std::unique_ptr<Table> table, const std::vector<std::string> &families) {
+  auto start_ns = TimeUtil::GetNowNanos();
   auto width = std::to_string(max_row).length();
   for (uint64_t j = 0; j < rows; j++) {
     std::string row = PrefixZero(width, iteration * rows + j);
@@ -188,20 +251,26 @@ bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols,
             hbase::Increment{row}.AddColumn(family, incrPrefix + std::to_string(k), k));
         if (!table->Append(hbase::Append{row}.Add(family, appendPrefix + std::to_string(k),
                                                   std::to_string(k)))) {
-          LOG(ERROR) << "append for " << row << " family: " << family << " failed";
+          LOG(ERROR) << "(Thread " << iteration << ") "
+                     << "append for " << row << " family: " << family << " failed";
           return false;
         }
       }
     }
     if ((j + 1) % FLAGS_report_num_rows == 0)
-      LOG(INFO) << "Written " << std::to_string(j + 1) << " increments";
+      LOG(INFO) << "(Thread " << iteration << ") "
+                << "Written " << std::to_string(j + 1) << " increments"
+                << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
+  LOG(INFO) << "(Thread " << iteration << ") "
+            << "written " << std::to_string(rows) << " increments"
+            << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   return true;
 }
 
 int main(int argc, char *argv[]) {
-  google::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line");
-  google::ParseCommandLineFlags(&argc, &argv, true);
+  gflags::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line");
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
   google::InstallFailureSignalHandler();
   FLAGS_logtostderr = 1;
@@ -244,94 +313,78 @@ int main(int argc, char *argv[]) {
   int rows = FLAGS_num_rows / FLAGS_threads;
   if (FLAGS_num_rows % FLAGS_threads != 0) rows++;
   int cols = FLAGS_num_cols;
-  bool succeeded = true;
+  std::atomic<int8_t> succeeded{1};  // not using bool since we want atomic &=
   if (FLAGS_puts) {
+    LOG(INFO) << "Sending put requests";
     auto start_ns = TimeUtil::GetNowNanos();
     std::vector<std::thread> writer_threads;
     for (int i = 0; i < FLAGS_threads; i++) {
-      writer_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
-        // Get connection to HBase Table
+      writer_threads.push_back(std::thread([&, i] {
         auto table = client->Table(*tn);
-
         DoPut(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families);
-        table->Close();
       }));
     }
-    for (std::vector<std::thread>::iterator it = writer_threads.begin(); it != writer_threads.end();
-         it++) {
-      std::thread thread = std::move(*it);
-      thread.join();
+    for (auto &t : writer_threads) {
+      t.join();
     }
     LOG(INFO) << "Successfully sent  " << num_puts << " Put requests in "
               << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
   if (FLAGS_appends) {
+    LOG(INFO) << "Sending append/increment requests";
     auto start_ns = TimeUtil::GetNowNanos();
     std::vector<std::thread> writer_threads;
     for (int i = 0; i < FLAGS_threads; i++) {
-      writer_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
-        // Get connection to HBase Table
+      writer_threads.push_back(std::thread([&, i] {
         auto table = client->Table(*tn);
-
         succeeded &=
             DoAppendIncrement(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families);
-        table->Close();
       }));
     }
-    for (std::vector<std::thread>::iterator it = writer_threads.begin(); it != writer_threads.end();
-         it++) {
-      std::thread thread = std::move(*it);
-      thread.join();
+    for (auto &t : writer_threads) {
+      t.join();
     }
     LOG(INFO) << "Successfully sent  " << num_puts << " append requests in "
               << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
 
   if (FLAGS_scans) {
+    LOG(INFO) << "Sending scan requests";
     auto start_ns = TimeUtil::GetNowNanos();
     std::vector<std::thread> reader_threads;
     for (int i = 0; i < FLAGS_threads; i++) {
-      reader_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
-        // Get connection to HBase Table
+      reader_threads.push_back(std::thread([&, i] {
         auto table1 = client->Table(*tn);
-
         succeeded &= DoScan(i, FLAGS_num_rows - 1, rows, std::move(table1), families);
-        table1->Close();
       }));
     }
-    for (std::vector<std::thread>::iterator it = reader_threads.begin(); it != reader_threads.end();
-         it++) {
-      std::thread thread = std::move(*it);
-      thread.join();
+    for (auto &t : reader_threads) {
+      t.join();
     }
 
-    LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent "
-              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    LOG(INFO) << (succeeded.load() ? "Successfully " : "Failed. ") << " scannned " << num_puts
+              << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
 
   if (FLAGS_gets) {
+    LOG(INFO) << "Sending get requests";
     auto start_ns = TimeUtil::GetNowNanos();
     std::vector<std::thread> reader_threads;
     for (int i = 0; i < FLAGS_threads; i++) {
-      reader_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] {
-        // Get connection to HBase Table
+      reader_threads.push_back(std::thread([&, i] {
         auto table1 = client->Table(*tn);
-
         succeeded &=
             DoGet(i, FLAGS_num_rows - 1, rows, std::move(table1), families, FLAGS_batch_num_rows);
-        table1->Close();
       }));
     }
-    for (std::vector<std::thread>::iterator it = reader_threads.begin(); it != reader_threads.end();
-         it++) {
-      std::thread thread = std::move(*it);
-      thread.join();
+    for (auto &t : reader_threads) {
+      t.join();
     }
 
-    LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent "
-              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    LOG(INFO) << (succeeded.load() ? "Successful. " : "Failed. ") << " sent multi-get requests for "
+              << num_puts << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
   }
   client->Close();
 
-  return succeeded ? 0 : -1;
+  return succeeded.load() ? 0 : -1;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5643e86/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index d36689e..6730248 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -82,8 +82,8 @@ void ValidateResult(const Result &result, const std::string &row) {
 }
 
 int main(int argc, char *argv[]) {
-  google::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line");
-  google::ParseCommandLineFlags(&argc, &argv, true);
+  gflags::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line");
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
   google::InstallFailureSignalHandler();
   FLAGS_logtostderr = 1;