You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/09/02 20:57:13 UTC
kudu git commit: [benchmarks/tpch] introduced AUTO_FLUSH_BACKGROUND
mode
Repository: kudu
Updated Branches:
refs/heads/master efb60241b -> 4d5fb0d3d
[benchmarks/tpch] introduced AUTO_FLUSH_BACKGROUND mode
Added an ability to run KuduSession in AUTO_FLUSH_BACKGROUND mode
while running TPC-H benchmarks. Also did other minor code clean-up
like re-ordering implementation of methods to match their declaration
order, etc.
Change-Id: I69d6897e9d1126270f2dc8b7d913d37e73428c1f
Reviewed-on: http://gerrit.cloudera.org:8080/4024
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d5fb0d3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d5fb0d3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d5fb0d3
Branch: refs/heads/master
Commit: 4d5fb0d3dfbb611c768d19e1ed67ee5d18668feb
Parents: efb6024
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Aug 17 19:41:40 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Sep 2 20:56:56 2016 +0000
----------------------------------------------------------------------
src/kudu/benchmarks/tpch/rpc_line_item_dao.cc | 113 +++++++++++----------
src/kudu/benchmarks/tpch/rpc_line_item_dao.h | 26 ++---
src/kudu/benchmarks/tpch/tpch1.cc | 4 +-
src/kudu/benchmarks/tpch/tpch_real_world.cc | 38 ++++---
4 files changed, 99 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
index f76af5b..ab1b357 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc
@@ -75,7 +75,7 @@ class FlushCallback : public KuduStatusCallback {
private:
void BatchFinished() {
int nerrs = session_->CountPendingErrors();
- if (nerrs) {
+ if (nerrs > 0) {
LOG(WARNING) << nerrs << " errors occured during last batch.";
vector<KuduError*> errors;
ElementDeleter d(&errors);
@@ -98,6 +98,22 @@ class FlushCallback : public KuduStatusCallback {
const Slice RpcLineItemDAO::kScanUpperBound = Slice("1998-09-02");
+RpcLineItemDAO::~RpcLineItemDAO() {
+ FinishWriting();
+}
+
+RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name,
+ int batch_op_num_max, int timeout_ms,
+ vector<const KuduPartialRow*> tablet_splits)
+ : master_address_(std::move(master_address)),
+ table_name_(std::move(table_name)),
+ timeout_(MonoDelta::FromMilliseconds(timeout_ms)),
+ batch_op_num_max_(batch_op_num_max),
+ tablet_splits_(std::move(tablet_splits)),
+ batch_op_num_(0),
+ semaphore_(1) {
+}
+
void RpcLineItemDAO::Init() {
const KuduSchema schema = tpch::CreateLineItemSchema();
@@ -121,32 +137,23 @@ void RpcLineItemDAO::Init() {
session_ = client_->NewSession();
session_->SetTimeoutMillis(timeout_.ToMilliseconds());
- CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ CHECK_OK(session_->SetFlushMode(batch_op_num_max_ > 0
+ ? KuduSession::MANUAL_FLUSH
+ : KuduSession::AUTO_FLUSH_BACKGROUND));
}
void RpcLineItemDAO::WriteLine(boost::function<void(KuduPartialRow*)> f) {
gscoped_ptr<KuduInsert> insert(client_table_->NewInsert());
f(insert->mutable_row());
CHECK_OK(session_->Apply(insert.release()));
- ++batch_size_;
- FlushIfBufferFull();
-}
-
-void RpcLineItemDAO::FlushIfBufferFull() {
- if (batch_size_ < batch_max_) return;
-
- batch_size_ = 0;
-
- // The callback object frees itself after it is invoked.
- session_->FlushAsync(new FlushCallback(session_, &semaphore_));
+ HandleLine();
}
void RpcLineItemDAO::MutateLine(boost::function<void(KuduPartialRow*)> f) {
gscoped_ptr<KuduUpdate> update(client_table_->NewUpdate());
f(update->mutable_row());
CHECK_OK(session_->Apply(update.release()));
- ++batch_size_;
- FlushIfBufferFull();
+ HandleLine();
}
void RpcLineItemDAO::FinishWriting() {
@@ -160,21 +167,7 @@ void RpcLineItemDAO::FinishWriting() {
void RpcLineItemDAO::OpenScanner(const vector<string>& columns,
gscoped_ptr<Scanner>* out_scanner) {
vector<KuduPredicate*> preds;
- OpenScanner(columns, preds, out_scanner);
-}
-
-void RpcLineItemDAO::OpenScanner(const vector<string>& columns,
- const vector<KuduPredicate*>& preds,
- gscoped_ptr<Scanner>* out_scanner) {
- gscoped_ptr<Scanner> ret(new Scanner);
- ret->scanner_.reset(new KuduScanner(client_table_.get()));
- ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning);
- CHECK_OK(ret->scanner_->SetProjectedColumns(columns));
- for (KuduPredicate* pred : preds) {
- CHECK_OK(ret->scanner_->AddConjunctPredicate(pred));
- }
- CHECK_OK(ret->scanner_->Open());
- out_scanner->swap(ret);
+ OpenScannerImpl(columns, preds, out_scanner);
}
void RpcLineItemDAO::OpenTpch1Scanner(gscoped_ptr<Scanner>* out_scanner) {
@@ -182,7 +175,7 @@ void RpcLineItemDAO::OpenTpch1Scanner(gscoped_ptr<Scanner>* out_scanner) {
preds.push_back(client_table_->NewComparisonPredicate(
tpch::kShipDateColName, KuduPredicate::LESS_EQUAL,
KuduValue::CopyString(kScanUpperBound)));
- OpenScanner(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
+ OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
}
void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(int64_t min_key, int64_t max_key,
@@ -197,7 +190,41 @@ void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(int64_t min_key, int64_t m
preds.push_back(client_table_->NewComparisonPredicate(
tpch::kOrderKeyColName, KuduPredicate::LESS_EQUAL,
KuduValue::FromInt(max_key)));
- OpenScanner(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
+ OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
+}
+
+bool RpcLineItemDAO::IsTableEmpty() {
+ KuduScanner scanner(client_table_.get());
+ CHECK_OK(scanner.Open());
+ return !scanner.HasMoreRows();
+}
+
+void RpcLineItemDAO::OpenScannerImpl(const vector<string>& columns,
+ const vector<KuduPredicate*>& preds,
+ gscoped_ptr<Scanner>* out_scanner) {
+ gscoped_ptr<Scanner> ret(new Scanner);
+ ret->scanner_.reset(new KuduScanner(client_table_.get()));
+ ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning);
+ CHECK_OK(ret->scanner_->SetProjectedColumns(columns));
+ for (KuduPredicate* pred : preds) {
+ CHECK_OK(ret->scanner_->AddConjunctPredicate(pred));
+ }
+ CHECK_OK(ret->scanner_->Open());
+ out_scanner->swap(ret);
+}
+
+void RpcLineItemDAO::HandleLine() {
+ if (batch_op_num_max_ == 0) {
+ // Nothing to take care in this case because it is an AUTO_FLUSH_BACKGROUND
+ // session.
+ return;
+ }
+ if (++batch_op_num_ < batch_op_num_max_) {
+ return;
+ }
+ batch_op_num_ = 0;
+ // The callback object frees itself after it is invoked.
+ session_->FlushAsync(new FlushCallback(session_, &semaphore_));
}
bool RpcLineItemDAO::Scanner::HasMore() {
@@ -212,26 +239,4 @@ void RpcLineItemDAO::Scanner::GetNext(vector<KuduRowResult> *rows) {
CHECK_OK(scanner_->NextBatch(rows));
}
-bool RpcLineItemDAO::IsTableEmpty() {
- KuduScanner scanner(client_table_.get());
- CHECK_OK(scanner.Open());
- return !scanner.HasMoreRows();
-}
-
-RpcLineItemDAO::~RpcLineItemDAO() {
- FinishWriting();
-}
-
-RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name,
- int batch_size, int mstimeout,
- vector<const KuduPartialRow*> tablet_splits)
- : master_address_(std::move(master_address)),
- table_name_(std::move(table_name)),
- timeout_(MonoDelta::FromMilliseconds(mstimeout)),
- batch_max_(batch_size),
- tablet_splits_(std::move(tablet_splits)),
- batch_size_(0),
- semaphore_(1) {
-}
-
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
index b29eef2..98924d6 100644
--- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
+++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h
@@ -38,13 +38,13 @@ class RpcLineItemDAO {
RpcLineItemDAO(std::string master_address,
std::string table_name,
- int batch_size,
- int mstimeout = 5000,
+ int batch_op_num_max,
+ int timeout_ms = 5000,
std::vector<const KuduPartialRow*> tablet_splits = {});
~RpcLineItemDAO();
+ void Init();
void WriteLine(boost::function<void(KuduPartialRow*)> f);
void MutateLine(boost::function<void(KuduPartialRow*)> f);
- void Init();
void FinishWriting();
// Deletes previous scanner if one is open.
@@ -82,21 +82,21 @@ class RpcLineItemDAO {
private:
static const Slice kScanUpperBound;
- void FlushIfBufferFull();
- void OpenScanner(const std::vector<std::string>& columns,
- const std::vector<client::KuduPredicate*>& preds,
- gscoped_ptr<Scanner>* scanner);
+ void OpenScannerImpl(const std::vector<std::string>& columns,
+ const std::vector<client::KuduPredicate*>& preds,
+ gscoped_ptr<Scanner>* scanner);
+ void HandleLine();
- simple_spinlock lock_;
- client::sp::shared_ptr<client::KuduClient> client_;
- client::sp::shared_ptr<client::KuduSession> session_;
- client::sp::shared_ptr<client::KuduTable> client_table_;
const std::string master_address_;
const std::string table_name_;
const MonoDelta timeout_;
- const int batch_max_;
+ const int batch_op_num_max_;
const std::vector<const KuduPartialRow*> tablet_splits_;
- int batch_size_;
+ int batch_op_num_;
+ simple_spinlock lock_;
+ client::sp::shared_ptr<client::KuduClient> client_;
+ client::sp::shared_ptr<client::KuduSession> session_;
+ client::sp::shared_ptr<client::KuduTable> client_table_;
// Semaphore which restricts us to one batch at a time.
Semaphore semaphore_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/tpch1.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch1.cc b/src/kudu/benchmarks/tpch/tpch1.cc
index 91d22b3..05c7b32 100644
--- a/src/kudu/benchmarks/tpch/tpch1.cc
+++ b/src/kudu/benchmarks/tpch/tpch1.cc
@@ -89,7 +89,9 @@ DEFINE_string(mini_cluster_base_dir, "/tmp/tpch",
DEFINE_string(master_address, "localhost",
"Address of master for the cluster to operate on");
DEFINE_int32(tpch_max_batch_size, 1000,
- "Maximum number of inserts/updates to batch at once");
+ "Maximum number of inserts/updates to batch at once. Set to 0 "
+ "to delegate the batching control to the logic of the "
+ "KuduSession running in AUTO_BACKGROUND_MODE flush mode.");
DEFINE_string(table_name, "lineitem",
"The table name to write/read");
http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/tpch_real_world.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch_real_world.cc b/src/kudu/benchmarks/tpch/tpch_real_world.cc
index e161ded..3a622a2 100644
--- a/src/kudu/benchmarks/tpch/tpch_real_world.cc
+++ b/src/kudu/benchmarks/tpch/tpch_real_world.cc
@@ -38,13 +38,14 @@
// insert, so the last timing shouldn't be used.
//
// TODO Make the inserts multi-threaded. See Kudu-629 for the technique.
-#include <boost/bind.hpp>
-#include <glog/logging.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+
#include "kudu/benchmarks/tpch/line_item_tsv_importer.h"
#include "kudu/benchmarks/tpch/rpc_line_item_dao.h"
#include "kudu/benchmarks/tpch/tpch-schemas.h"
@@ -69,7 +70,9 @@ DEFINE_bool(tpch_load_data, true,
DEFINE_bool(tpch_run_queries, true,
"Query dbgen data as it is inserted");
DEFINE_int32(tpch_max_batch_size, 1000,
- "Maximum number of inserts to batch at once");
+ "Maximum number of inserts/updates to batch at once. Set to 0 "
+ "to delegate the batching control to the logic of the "
+ "KuduSession running in AUTO_BACKGROUND_MODE flush mode.");
DEFINE_int32(tpch_test_client_timeout_msec, 10000,
"Timeout that will be used for all operations and RPCs");
DEFINE_int32(tpch_test_runtime_sec, 0,
@@ -250,11 +253,12 @@ gscoped_ptr<RpcLineItemDAO> TpchRealWorld::GetInittedDAO() {
split_rows.push_back(row);
}
- gscoped_ptr<RpcLineItemDAO> dao(new RpcLineItemDAO(master_addresses_,
- FLAGS_tpch_table_name,
- FLAGS_tpch_max_batch_size,
- FLAGS_tpch_test_client_timeout_msec,
- split_rows));
+ gscoped_ptr<RpcLineItemDAO> dao(
+ new RpcLineItemDAO(master_addresses_,
+ FLAGS_tpch_table_name,
+ FLAGS_tpch_max_batch_size,
+ FLAGS_tpch_test_client_timeout_msec,
+ split_rows));
dao->Init();
return std::move(dao);
}
@@ -266,14 +270,20 @@ void TpchRealWorld::LoadLineItemsThread(int i) {
boost::function<void(KuduPartialRow*)> f =
boost::bind(&LineItemTsvImporter::GetNextLine, &importer, _1);
- while (importer.HasNextLine() && !stop_threads_.Load()) {
- dao->WriteLine(f);
- int64_t current_count = rows_inserted_.Increment();
- if (current_count % 250000 == 0) {
- LOG(INFO) << "Inserted " << current_count << " rows";
+ const string time_spent_msg = Substitute(
+ "by thread $0 to load generated data into the database", i);
+ LOG_TIMING(INFO, time_spent_msg) {
+ while (importer.HasNextLine() && !stop_threads_.Load()) {
+ dao->WriteLine(f);
+ int64_t current_count = rows_inserted_.Increment();
+ if (current_count % 250000 == 0) {
+ LOG(INFO) << "Inserted " << current_count << " rows";
+ }
}
+ dao->FinishWriting();
}
- dao->FinishWriting();
+ LOG(INFO) << Substitute("Thread $0 inserted ", i)
+ << rows_inserted_.Load() << " rows in total";
}
void TpchRealWorld::MonitorDbgenThread(int i) {