You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/09/02 20:57:13 UTC

kudu git commit: [benchmarks/tpch] introduced AUTO_FLUSH_BACKGROUND mode

Repository: kudu
Updated Branches:
  refs/heads/master efb60241b -> 4d5fb0d3d


[benchmarks/tpch] introduced AUTO_FLUSH_BACKGROUND mode

Added an ability to run KuduSession in AUTO_FLUSH_BACKGROUND mode
while running TPC-H benchmarks.  Also did other minor code clean-up
like re-ordering implementation of methods to match their declaration
order, etc.

Change-Id: I69d6897e9d1126270f2dc8b7d913d37e73428c1f
Reviewed-on: http://gerrit.cloudera.org:8080/4024
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d5fb0d3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d5fb0d3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d5fb0d3

Branch: refs/heads/master
Commit: 4d5fb0d3dfbb611c768d19e1ed67ee5d18668feb
Parents: efb6024
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Aug 17 19:41:40 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Sep 2 20:56:56 2016 +0000

----------------------------------------------------------------------
 src/kudu/benchmarks/tpch/rpc_line_item_dao.cc | 113 +++++++++++----------
 src/kudu/benchmarks/tpch/rpc_line_item_dao.h  |  26 ++---
 src/kudu/benchmarks/tpch/tpch1.cc             |   4 +-
 src/kudu/benchmarks/tpch/tpch_real_world.cc   |  38 ++++---
 4 files changed, 99 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
index f76af5b..ab1b357 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
@@ -75,7 +75,7 @@ class FlushCallback : public KuduStatusCallback {
  private:
   void BatchFinished() {
     int nerrs = session_->CountPendingErrors();
-    if (nerrs) {
+    if (nerrs > 0) {
       LOG(WARNING) << nerrs << " errors occured during last batch.";
       vector<KuduError*> errors;
       ElementDeleter d(&errors);
@@ -98,6 +98,22 @@ class FlushCallback : public KuduStatusCallback {
 
 const Slice RpcLineItemDAO::kScanUpperBound = Slice("1998-09-02");
 
+RpcLineItemDAO::~RpcLineItemDAO() {
+  FinishWriting();
+}
+
+RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name,
+                               int batch_op_num_max, int timeout_ms,
+                               vector<const KuduPartialRow*> tablet_splits)
+    : master_address_(std::move(master_address)),
+      table_name_(std::move(table_name)),
+      timeout_(MonoDelta::FromMilliseconds(timeout_ms)),
+      batch_op_num_max_(batch_op_num_max),
+      tablet_splits_(std::move(tablet_splits)),
+      batch_op_num_(0),
+      semaphore_(1) {
+}
+
 void RpcLineItemDAO::Init() {
   const KuduSchema schema = tpch::CreateLineItemSchema();
 
@@ -121,32 +137,23 @@ void RpcLineItemDAO::Init() {
 
   session_ = client_->NewSession();
   session_->SetTimeoutMillis(timeout_.ToMilliseconds());
-  CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  CHECK_OK(session_->SetFlushMode(batch_op_num_max_ > 0
+                                  ? KuduSession::MANUAL_FLUSH
+                                  : KuduSession::AUTO_FLUSH_BACKGROUND));
 }
 
 void RpcLineItemDAO::WriteLine(boost::function<void(KuduPartialRow*)> f) {
   gscoped_ptr<KuduInsert> insert(client_table_->NewInsert());
   f(insert->mutable_row());
   CHECK_OK(session_->Apply(insert.release()));
-  ++batch_size_;
-  FlushIfBufferFull();
-}
-
-void RpcLineItemDAO::FlushIfBufferFull() {
-  if (batch_size_ < batch_max_) return;
-
-  batch_size_ = 0;
-
-  // The callback object frees itself after it is invoked.
-  session_->FlushAsync(new FlushCallback(session_, &semaphore_));
+  HandleLine();
 }
 
 void RpcLineItemDAO::MutateLine(boost::function<void(KuduPartialRow*)> f) {
   gscoped_ptr<KuduUpdate> update(client_table_->NewUpdate());
   f(update->mutable_row());
   CHECK_OK(session_->Apply(update.release()));
-  ++batch_size_;
-  FlushIfBufferFull();
+  HandleLine();
 }
 
 void RpcLineItemDAO::FinishWriting() {
@@ -160,21 +167,7 @@ void RpcLineItemDAO::FinishWriting() {
 void RpcLineItemDAO::OpenScanner(const vector<string>& columns,
                                  gscoped_ptr<Scanner>* out_scanner) {
   vector<KuduPredicate*> preds;
-  OpenScanner(columns, preds, out_scanner);
-}
-
-void RpcLineItemDAO::OpenScanner(const vector<string>& columns,
-                                 const vector<KuduPredicate*>& preds,
-                                 gscoped_ptr<Scanner>* out_scanner) {
-  gscoped_ptr<Scanner> ret(new Scanner);
-  ret->scanner_.reset(new KuduScanner(client_table_.get()));
-  ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning);
-  CHECK_OK(ret->scanner_->SetProjectedColumns(columns));
-  for (KuduPredicate* pred : preds) {
-    CHECK_OK(ret->scanner_->AddConjunctPredicate(pred));
-  }
-  CHECK_OK(ret->scanner_->Open());
-  out_scanner->swap(ret);
+  OpenScannerImpl(columns, preds, out_scanner);
 }
 
 void RpcLineItemDAO::OpenTpch1Scanner(gscoped_ptr<Scanner>* out_scanner) {
@@ -182,7 +175,7 @@ void RpcLineItemDAO::OpenTpch1Scanner(gscoped_ptr<Scanner>* out_scanner) {
   preds.push_back(client_table_->NewComparisonPredicate(
                       tpch::kShipDateColName, KuduPredicate::LESS_EQUAL,
                       KuduValue::CopyString(kScanUpperBound)));
-  OpenScanner(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
+  OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
 }
 
 void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(int64_t min_key, int64_t max_key,
@@ -197,7 +190,41 @@ void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(int64_t min_key, int64_t m
   preds.push_back(client_table_->NewComparisonPredicate(
                       tpch::kOrderKeyColName, KuduPredicate::LESS_EQUAL,
                       KuduValue::FromInt(max_key)));
-  OpenScanner(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
+  OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
+}
+
+bool RpcLineItemDAO::IsTableEmpty() {
+  KuduScanner scanner(client_table_.get());
+  CHECK_OK(scanner.Open());
+  return !scanner.HasMoreRows();
+}
+
+void RpcLineItemDAO::OpenScannerImpl(const vector<string>& columns,
+                                     const vector<KuduPredicate*>& preds,
+                                     gscoped_ptr<Scanner>* out_scanner) {
+  gscoped_ptr<Scanner> ret(new Scanner);
+  ret->scanner_.reset(new KuduScanner(client_table_.get()));
+  ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning);
+  CHECK_OK(ret->scanner_->SetProjectedColumns(columns));
+  for (KuduPredicate* pred : preds) {
+    CHECK_OK(ret->scanner_->AddConjunctPredicate(pred));
+  }
+  CHECK_OK(ret->scanner_->Open());
+  out_scanner->swap(ret);
+}
+
+void RpcLineItemDAO::HandleLine() {
+  if (batch_op_num_max_ == 0) {
+    // Nothing to take care in this case because it is an AUTO_FLUSH_BACKGROUND
+    // session.
+    return;
+  }
+  if (++batch_op_num_ < batch_op_num_max_) {
+    return;
+  }
+  batch_op_num_ = 0;
+  // The callback object frees itself after it is invoked.
+  session_->FlushAsync(new FlushCallback(session_, &semaphore_));
 }
 
 bool RpcLineItemDAO::Scanner::HasMore() {
@@ -212,26 +239,4 @@ void RpcLineItemDAO::Scanner::GetNext(vector<KuduRowResult> *rows) {
   CHECK_OK(scanner_->NextBatch(rows));
 }
 
-bool RpcLineItemDAO::IsTableEmpty() {
-  KuduScanner scanner(client_table_.get());
-  CHECK_OK(scanner.Open());
-  return !scanner.HasMoreRows();
-}
-
-RpcLineItemDAO::~RpcLineItemDAO() {
-  FinishWriting();
-}
-
-RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name,
-                               int batch_size, int mstimeout,
-                               vector<const KuduPartialRow*> tablet_splits)
-    : master_address_(std::move(master_address)),
-      table_name_(std::move(table_name)),
-      timeout_(MonoDelta::FromMilliseconds(mstimeout)),
-      batch_max_(batch_size),
-      tablet_splits_(std::move(tablet_splits)),
-      batch_size_(0),
-      semaphore_(1) {
-}
-
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
index b29eef2..98924d6 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
@@ -38,13 +38,13 @@ class RpcLineItemDAO {
 
   RpcLineItemDAO(std::string master_address,
                  std::string table_name,
-                 int batch_size,
-                 int mstimeout = 5000,
+                 int batch_op_num_max,
+                 int timeout_ms = 5000,
                  std::vector<const KuduPartialRow*> tablet_splits = {});
   ~RpcLineItemDAO();
+  void Init();
   void WriteLine(boost::function<void(KuduPartialRow*)> f);
   void MutateLine(boost::function<void(KuduPartialRow*)> f);
-  void Init();
   void FinishWriting();
 
   // Deletes previous scanner if one is open.
@@ -82,21 +82,21 @@ class RpcLineItemDAO {
  private:
   static const Slice kScanUpperBound;
 
-  void FlushIfBufferFull();
-  void OpenScanner(const std::vector<std::string>& columns,
-                   const std::vector<client::KuduPredicate*>& preds,
-                   gscoped_ptr<Scanner>* scanner);
+  void OpenScannerImpl(const std::vector<std::string>& columns,
+                       const std::vector<client::KuduPredicate*>& preds,
+                       gscoped_ptr<Scanner>* scanner);
+  void HandleLine();
 
-  simple_spinlock lock_;
-  client::sp::shared_ptr<client::KuduClient> client_;
-  client::sp::shared_ptr<client::KuduSession> session_;
-  client::sp::shared_ptr<client::KuduTable> client_table_;
   const std::string master_address_;
   const std::string table_name_;
   const MonoDelta timeout_;
-  const int batch_max_;
+  const int batch_op_num_max_;
   const std::vector<const KuduPartialRow*> tablet_splits_;
-  int batch_size_;
+  int batch_op_num_;
+  simple_spinlock lock_;
+  client::sp::shared_ptr<client::KuduClient> client_;
+  client::sp::shared_ptr<client::KuduSession> session_;
+  client::sp::shared_ptr<client::KuduTable> client_table_;
 
   // Semaphore which restricts us to one batch at a time.
   Semaphore semaphore_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/tpch1.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch1.cc b/src/kudu/benchmarks/tpch/tpch1.cc
index 91d22b3..05c7b32 100644
--- a/src/kudu/benchmarks/tpch/tpch1.cc
+++ b/src/kudu/benchmarks/tpch/tpch1.cc
@@ -89,7 +89,9 @@ DEFINE_string(mini_cluster_base_dir, "/tmp/tpch",
 DEFINE_string(master_address, "localhost",
               "Address of master for the cluster to operate on");
 DEFINE_int32(tpch_max_batch_size, 1000,
-             "Maximum number of inserts/updates to batch at once");
+             "Maximum number of inserts/updates to batch at once.  Set to 0 "
+             "to delegate the batching control to the logic of the "
+             "KuduSession running in AUTO_BACKGROUND_MODE flush mode.");
 DEFINE_string(table_name, "lineitem",
               "The table name to write/read");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/tpch_real_world.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch_real_world.cc b/src/kudu/benchmarks/tpch/tpch_real_world.cc
index e161ded..3a622a2 100644
--- a/src/kudu/benchmarks/tpch/tpch_real_world.cc
+++ b/src/kudu/benchmarks/tpch/tpch_real_world.cc
@@ -38,13 +38,14 @@
 //    insert, so the last timing shouldn't be used.
 //
 // TODO Make the inserts multi-threaded. See Kudu-629 for the technique.
-#include <boost/bind.hpp>
 
-#include <glog/logging.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+
 #include "kudu/benchmarks/tpch/line_item_tsv_importer.h"
 #include "kudu/benchmarks/tpch/rpc_line_item_dao.h"
 #include "kudu/benchmarks/tpch/tpch-schemas.h"
@@ -69,7 +70,9 @@ DEFINE_bool(tpch_load_data, true,
 DEFINE_bool(tpch_run_queries, true,
             "Query dbgen data as it is inserted");
 DEFINE_int32(tpch_max_batch_size, 1000,
-             "Maximum number of inserts to batch at once");
+             "Maximum number of inserts/updates to batch at once.  Set to 0 "
+             "to delegate the batching control to the logic of the "
+             "KuduSession running in AUTO_BACKGROUND_MODE flush mode.");
 DEFINE_int32(tpch_test_client_timeout_msec, 10000,
              "Timeout that will be used for all operations and RPCs");
 DEFINE_int32(tpch_test_runtime_sec, 0,
@@ -250,11 +253,12 @@ gscoped_ptr<RpcLineItemDAO> TpchRealWorld::GetInittedDAO() {
     split_rows.push_back(row);
   }
 
-  gscoped_ptr<RpcLineItemDAO> dao(new RpcLineItemDAO(master_addresses_,
-                                                     FLAGS_tpch_table_name,
-                                                     FLAGS_tpch_max_batch_size,
-                                                     FLAGS_tpch_test_client_timeout_msec,
-                                                     split_rows));
+  gscoped_ptr<RpcLineItemDAO> dao(
+        new RpcLineItemDAO(master_addresses_,
+                           FLAGS_tpch_table_name,
+                           FLAGS_tpch_max_batch_size,
+                           FLAGS_tpch_test_client_timeout_msec,
+                           split_rows));
   dao->Init();
   return std::move(dao);
 }
@@ -266,14 +270,20 @@ void TpchRealWorld::LoadLineItemsThread(int i) {
 
   boost::function<void(KuduPartialRow*)> f =
       boost::bind(&LineItemTsvImporter::GetNextLine, &importer, _1);
-  while (importer.HasNextLine() && !stop_threads_.Load()) {
-    dao->WriteLine(f);
-    int64_t current_count = rows_inserted_.Increment();
-    if (current_count % 250000 == 0) {
-      LOG(INFO) << "Inserted " << current_count << " rows";
+  const string time_spent_msg = Substitute(
+        "by thread $0 to load generated data into the database", i);
+  LOG_TIMING(INFO, time_spent_msg) {
+    while (importer.HasNextLine() && !stop_threads_.Load()) {
+      dao->WriteLine(f);
+      int64_t current_count = rows_inserted_.Increment();
+      if (current_count % 250000 == 0) {
+        LOG(INFO) << "Inserted " << current_count << " rows";
+      }
     }
+    dao->FinishWriting();
   }
-  dao->FinishWriting();
+  LOG(INFO) << Substitute("Thread $0 inserted ", i)
+            << rows_inserted_.Load() << " rows in total";
 }
 
 void TpchRealWorld::MonitorDbgenThread(int i) {