You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/14 14:52:18 UTC
[kudu] 02/02: [tools] add --use-upsert option for 'perf loadgen'
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit eaf558527a9f996e68515bf190dc7cf1ab9931a3
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sat Sep 12 20:27:41 2020 -0700
[tools] add --use-upsert option for 'perf loadgen'
With the newly introduced --use_upsert option, it's possible to run the
loadgen tool against the same table many times, simulating update
workloads.
This patch also adds corresponding test scenario into the kudu-tool-test
suite.
In addition, I did a minor clean-up of the code in tool_action_perf.cc.
Change-Id: I52aa9513d82aa4420bcf57dc7f535a3fc32df792
Reviewed-on: http://gerrit.cloudera.org:8080/16445
Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/tools/kudu-tool-test.cc | 21 ++++++++++
src/kudu/tools/tool_action_perf.cc | 86 +++++++++++++++++++++++---------------
2 files changed, 74 insertions(+), 33 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index b0802eb..6edc8ff 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2297,6 +2297,27 @@ TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundRandomValuesIgnoreDeprecated) {
"bench_auto_flush_background_random_values_ignore_deprecated"));
}
+// Run loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, writing the generated
+// data using UPSERT instead of INSERT.
+TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundUseUpsert) {
+ NO_FATALS(RunLoadgen(
+ 1 /* num_tservers */,
+ {
+ "--num_rows_per_thread=4096",
+ "--num_threads=8",
+ "--run_scan",
+ "--string_len=8",
+ // Use UPSERT (default is to use INSERT) operations for writing rows.
+ "--use_upsert",
+ // Use random values: even if there are many threads writing many
+ // rows, no errors are expected because of using UPSERT instead of
+ // INSERT.
+ "--use_random_pk",
+ "--use_random_non_pk",
+ },
+ "bench_auto_flush_background_use_upsert"));
+}
+
// Run the loadgen benchmark in MANUAL_FLUSH mode.
TEST_F(ToolTest, TestLoadgenManualFlush) {
NO_FATALS(RunLoadgen(3,
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 9aa19af..d5b3d53 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -172,7 +172,6 @@
#include <limits>
#include <memory>
#include <mutex>
-#include <numeric>
#include <string>
#include <thread>
#include <unordered_map>
@@ -232,14 +231,13 @@ using kudu::TypeInfo;
using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduError;
-using kudu::client::KuduInsert;
-using kudu::client::KuduDelete;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
+using kudu::client::KuduWriteOperation;
using kudu::clock::LogicalClock;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::ConsensusMetadata;
@@ -249,8 +247,6 @@ using kudu::log::LogAnchorRegistry;
using kudu::tablet::RowIteratorOptions;
using kudu::tablet::Tablet;
using kudu::tablet::TabletMetadata;
-using kudu::client::KuduWriteOperation;
-using std::accumulate;
using std::cerr;
using std::cout;
using std::endl;
@@ -367,6 +363,9 @@ DEFINE_bool(use_random_pk, false,
DEFINE_bool(use_random_non_pk, false,
"Whether to use random numbers instead of sequential ones for non-primary key "
"columns.");
+DEFINE_bool(use_upsert, false,
+ "Whether to use UPSERT instead of INSERT to store the generated "
+ "data into the table");
namespace kudu {
namespace tools {
@@ -388,6 +387,19 @@ bool ValidatePartitionFlags() {
}
GROUP_FLAG_VALIDATOR(partition_flags, &ValidatePartitionFlags);
+const char* OpTypeToString(KuduWriteOperation::Type op_type) {
+ switch (op_type) {
+ case KuduWriteOperation::INSERT:
+ return "INSERT";
+ case KuduWriteOperation::DELETE:
+ return "DELETE";
+ case KuduWriteOperation::UPSERT:
+ return "UPSERT";
+ default:
+ LOG(FATAL) << Substitute("unsupported op_type $0", op_type);
+ }
+}
+
class Generator {
public:
enum Mode {
@@ -465,10 +477,12 @@ int64_t SpanPerThread(int num_key_columns) {
Status GenerateRowData(Generator* key_gen, Generator* value_gen, KuduPartialRow* row,
const string& fixed_string, KuduWriteOperation::Type op_type) {
const vector<ColumnSchema>& columns(row->schema()->columns());
- DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
- op_type == KuduWriteOperation::Type::DELETE);
- size_t gen_column_count = op_type == KuduWriteOperation::Type::INSERT ?
- columns.size() : row->schema()->num_key_columns();
+ DCHECK(op_type == KuduWriteOperation::INSERT ||
+ op_type == KuduWriteOperation::DELETE ||
+ op_type == KuduWriteOperation::UPSERT);
+ const size_t gen_column_count = op_type == KuduWriteOperation::DELETE
+ ? row->schema()->num_key_columns()
+ : columns.size();
// Seperate key Generator and value Generator, so we can generate the same primary keys
// when perform DELETE operations.
Generator* gen = key_gen;
@@ -598,36 +612,38 @@ WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client,
// Planning for non-intersecting ranges for different generator threads
// in sequential generation mode.
- const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema(table->schema()).num_key_columns());
+ const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema(
+ table->schema()).num_key_columns());
const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
Generator key_gen(key_gen_mode, gen_seed, FLAGS_string_len);
Generator value_gen(value_gen_mode, gen_seed, FLAGS_string_len);
+ unique_ptr<KuduWriteOperation> op;
for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) {
switch (op_type) {
- case KuduWriteOperation::Type::INSERT: {
- unique_ptr<KuduInsert> insert_op(table->NewInsert());
- RETURN_NOT_OK(GenerateRowData(&key_gen, &value_gen, insert_op->mutable_row(),
- FLAGS_string_fixed, op_type));
- RETURN_NOT_OK(session->Apply(insert_op.release()));
+ case KuduWriteOperation::INSERT:
+ op.reset(table->NewInsert());
break;
- }
- case KuduWriteOperation::Type::DELETE: {
- unique_ptr<KuduDelete> delete_op(table->NewDelete());
- RETURN_NOT_OK(GenerateRowData(&key_gen, nullptr, delete_op->mutable_row(),
- FLAGS_string_fixed, op_type));
- RETURN_NOT_OK(session->Apply(delete_op.release()));
+ case KuduWriteOperation::DELETE:
+ op.reset(table->NewDelete());
+ break;
+ case KuduWriteOperation::UPSERT:
+ op.reset(table->NewUpsert());
break;
- }
default:
- LOG(FATAL) << "Unknown op_type=" << op_type;
+ LOG(FATAL) << Substitute("unknown op_type $0", op_type);
}
+ RETURN_NOT_OK(GenerateRowData(
+ &key_gen,
+ op_type == KuduWriteOperation::DELETE ? nullptr : &value_gen,
+ op->mutable_row(),
+ FLAGS_string_fixed,
+ op_type));
+ RETURN_NOT_OK(session->Apply(op.release()));
if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) {
session->FlushAsync(nullptr);
}
}
- RETURN_NOT_OK(session->Flush());
-
- return Status::OK();
+ return session->Flush();
};
WriteResults results;
@@ -658,8 +674,9 @@ WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client,
WriteResults GenerateWriteRows(const ClientFactory& client_factory,
const string& table_name,
KuduWriteOperation::Type op_type) {
- DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
- op_type == KuduWriteOperation::Type::DELETE);
+ DCHECK(op_type == KuduWriteOperation::INSERT ||
+ op_type == KuduWriteOperation::DELETE ||
+ op_type == KuduWriteOperation::UPSERT);
const size_t gen_num = FLAGS_num_threads;
vector<WriteResults> results(gen_num);
@@ -685,7 +702,7 @@ WriteResults GenerateWriteRows(const ClientFactory& client_factory,
std::max(combined.latest_observed_timestamp, r.latest_observed_timestamp);
}
cout << endl
- << (op_type == KuduWriteOperation::Type::INSERT ? "INSERT" : "DELETE") << " report" << endl
+ << OpTypeToString(op_type) << " report" << endl
<< " rows total: " << combined.row_count << endl
<< " time total: " << time_total_ms << " ms" << endl;
if (combined.row_count != 0 && combined.err_count == 0) {
@@ -788,8 +805,10 @@ Status TestLoadGenerator(const RunnerContext& context) {
CHECK_OK(CreateKuduClient(context, &client));
return client;
};
- WriteResults write_results =
- GenerateWriteRows(client_factory, table_name, KuduWriteOperation::Type::INSERT);
+ WriteResults write_results = GenerateWriteRows(
+ client_factory,
+ table_name,
+ FLAGS_use_upsert ? KuduWriteOperation::UPSERT : KuduWriteOperation::INSERT);
RETURN_NOT_OK(write_results.status);
client->SetLatestObservedTimestamp(write_results.latest_observed_timestamp);
if (FLAGS_run_scan) {
@@ -808,8 +827,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
}
if (FLAGS_run_cleanup) {
- RETURN_NOT_OK(
- GenerateWriteRows(client_factory, table_name, KuduWriteOperation::Type::DELETE).status);
+ RETURN_NOT_OK(GenerateWriteRows(
+ client_factory, table_name, KuduWriteOperation::DELETE).status);
}
if (is_auto_table && !FLAGS_keep_auto_table) {
@@ -948,6 +967,7 @@ unique_ptr<Mode> BuildPerfMode() {
.AddOptionalParameter("use_random")
.AddOptionalParameter("use_random_pk")
.AddOptionalParameter("use_random_non_pk")
+ .AddOptionalParameter("use_upsert")
.Build();
unique_ptr<Action> table_scan =