You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/14 14:52:18 UTC

[kudu] 02/02: [tools] add --use-upsert option for 'perf loadgen'

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit eaf558527a9f996e68515bf190dc7cf1ab9931a3
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sat Sep 12 20:27:41 2020 -0700

    [tools] add --use-upsert option for 'perf loadgen'
    
    With the newly introduced --use_upsert option, it's possible to run the
    loadgen tool against the same table many times, simulating update
    workloads.
    
    This patch also adds corresponding test scenario into the kudu-tool-test
    suite.
    
    In addition, I did a minor clean-up of the code in tool_action_perf.cc.
    
    Change-Id: I52aa9513d82aa4420bcf57dc7f535a3fc32df792
    Reviewed-on: http://gerrit.cloudera.org:8080/16445
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/tools/kudu-tool-test.cc   | 21 ++++++++++
 src/kudu/tools/tool_action_perf.cc | 86 +++++++++++++++++++++++---------------
 2 files changed, 74 insertions(+), 33 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index b0802eb..6edc8ff 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2297,6 +2297,27 @@ TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundRandomValuesIgnoreDeprecated) {
       "bench_auto_flush_background_random_values_ignore_deprecated"));
 }
 
+// Run loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, writing the generated
+// data using UPSERT instead of INSERT.
+TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundUseUpsert) {
+  NO_FATALS(RunLoadgen(
+      1 /* num_tservers */,
+      {
+        "--num_rows_per_thread=4096",
+        "--num_threads=8",
+        "--run_scan",
+        "--string_len=8",
+        // Use UPSERT (default is to use INSERT) operations for writing rows.
+        "--use_upsert",
+        // Use random values: even if there are many threads writing many
+        // rows, no errors are expected because of using UPSERT instead of
+        // INSERT.
+        "--use_random_pk",
+        "--use_random_non_pk",
+      },
+      "bench_auto_flush_background_use_upsert"));
+}
+
 // Run the loadgen benchmark in MANUAL_FLUSH mode.
 TEST_F(ToolTest, TestLoadgenManualFlush) {
   NO_FATALS(RunLoadgen(3,
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 9aa19af..d5b3d53 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -172,7 +172,6 @@
 #include <limits>
 #include <memory>
 #include <mutex>
-#include <numeric>
 #include <string>
 #include <thread>
 #include <unordered_map>
@@ -232,14 +231,13 @@ using kudu::TypeInfo;
 using kudu::client::KuduClient;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduError;
-using kudu::client::KuduInsert;
-using kudu::client::KuduDelete;
 using kudu::client::KuduScanner;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
+using kudu::client::KuduWriteOperation;
 using kudu::clock::LogicalClock;
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::consensus::ConsensusMetadata;
@@ -249,8 +247,6 @@ using kudu::log::LogAnchorRegistry;
 using kudu::tablet::RowIteratorOptions;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletMetadata;
-using kudu::client::KuduWriteOperation;
-using std::accumulate;
 using std::cerr;
 using std::cout;
 using std::endl;
@@ -367,6 +363,9 @@ DEFINE_bool(use_random_pk, false,
 DEFINE_bool(use_random_non_pk, false,
             "Whether to use random numbers instead of sequential ones for non-primary key "
             "columns.");
+DEFINE_bool(use_upsert, false,
+            "Whether to use UPSERT instead of INSERT to store the generated "
+            "data into the table");
 
 namespace kudu {
 namespace tools {
@@ -388,6 +387,19 @@ bool ValidatePartitionFlags() {
 }
 GROUP_FLAG_VALIDATOR(partition_flags, &ValidatePartitionFlags);
 
+const char* OpTypeToString(KuduWriteOperation::Type op_type) {
+  switch (op_type) {
+    case KuduWriteOperation::INSERT:
+      return "INSERT";
+    case KuduWriteOperation::DELETE:
+      return "DELETE";
+    case KuduWriteOperation::UPSERT:
+      return "UPSERT";
+    default:
+      LOG(FATAL) << Substitute("unsupported op_type $0", op_type);
+  }
+}
+
 class Generator {
  public:
   enum Mode {
@@ -465,10 +477,12 @@ int64_t SpanPerThread(int num_key_columns) {
 Status GenerateRowData(Generator* key_gen, Generator* value_gen, KuduPartialRow* row,
                        const string& fixed_string, KuduWriteOperation::Type op_type) {
   const vector<ColumnSchema>& columns(row->schema()->columns());
-  DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
-      op_type == KuduWriteOperation::Type::DELETE);
-  size_t gen_column_count = op_type == KuduWriteOperation::Type::INSERT ?
-      columns.size() : row->schema()->num_key_columns();
+  DCHECK(op_type == KuduWriteOperation::INSERT ||
+         op_type == KuduWriteOperation::DELETE ||
+         op_type == KuduWriteOperation::UPSERT);
+  const size_t gen_column_count = op_type == KuduWriteOperation::DELETE
+      ? row->schema()->num_key_columns()
+      : columns.size();
   // Seperate key Generator and value Generator, so we can generate the same primary keys
   // when perform DELETE operations.
   Generator* gen = key_gen;
@@ -598,36 +612,38 @@ WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client,
 
     // Planning for non-intersecting ranges for different generator threads
     // in sequential generation mode.
-    const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema(table->schema()).num_key_columns());
+    const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema(
+        table->schema()).num_key_columns());
     const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
     Generator key_gen(key_gen_mode, gen_seed, FLAGS_string_len);
     Generator value_gen(value_gen_mode, gen_seed, FLAGS_string_len);
+    unique_ptr<KuduWriteOperation> op;
     for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) {
       switch (op_type) {
-        case KuduWriteOperation::Type::INSERT: {
-          unique_ptr<KuduInsert> insert_op(table->NewInsert());
-          RETURN_NOT_OK(GenerateRowData(&key_gen, &value_gen, insert_op->mutable_row(),
-                                        FLAGS_string_fixed, op_type));
-          RETURN_NOT_OK(session->Apply(insert_op.release()));
+        case KuduWriteOperation::INSERT:
+          op.reset(table->NewInsert());
           break;
-        }
-        case KuduWriteOperation::Type::DELETE: {
-          unique_ptr<KuduDelete> delete_op(table->NewDelete());
-          RETURN_NOT_OK(GenerateRowData(&key_gen, nullptr, delete_op->mutable_row(),
-                                        FLAGS_string_fixed, op_type));
-          RETURN_NOT_OK(session->Apply(delete_op.release()));
+        case KuduWriteOperation::DELETE:
+          op.reset(table->NewDelete());
+          break;
+        case KuduWriteOperation::UPSERT:
+          op.reset(table->NewUpsert());
           break;
-        }
         default:
-          LOG(FATAL) << "Unknown op_type=" << op_type;
+          LOG(FATAL) << Substitute("unknown op_type $0", op_type);
       }
+      RETURN_NOT_OK(GenerateRowData(
+          &key_gen,
+          op_type == KuduWriteOperation::DELETE ? nullptr : &value_gen,
+          op->mutable_row(),
+          FLAGS_string_fixed,
+          op_type));
+      RETURN_NOT_OK(session->Apply(op.release()));
       if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) {
         session->FlushAsync(nullptr);
       }
     }
-    RETURN_NOT_OK(session->Flush());
-
-    return Status::OK();
+    return session->Flush();
   };
 
   WriteResults results;
@@ -658,8 +674,9 @@ WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client,
 WriteResults GenerateWriteRows(const ClientFactory& client_factory,
                                const string& table_name,
                                KuduWriteOperation::Type op_type) {
-  DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
-         op_type == KuduWriteOperation::Type::DELETE);
+  DCHECK(op_type == KuduWriteOperation::INSERT ||
+         op_type == KuduWriteOperation::DELETE ||
+         op_type == KuduWriteOperation::UPSERT);
 
   const size_t gen_num = FLAGS_num_threads;
   vector<WriteResults> results(gen_num);
@@ -685,7 +702,7 @@ WriteResults GenerateWriteRows(const ClientFactory& client_factory,
         std::max(combined.latest_observed_timestamp, r.latest_observed_timestamp);
   }
   cout << endl
-       << (op_type == KuduWriteOperation::Type::INSERT ? "INSERT" : "DELETE") << " report" << endl
+       << OpTypeToString(op_type) << " report" << endl
        << "    rows total: " << combined.row_count << endl
        << "    time total: " << time_total_ms << " ms" << endl;
   if (combined.row_count != 0 && combined.err_count == 0) {
@@ -788,8 +805,10 @@ Status TestLoadGenerator(const RunnerContext& context) {
     CHECK_OK(CreateKuduClient(context, &client));
     return client;
   };
-  WriteResults write_results =
-      GenerateWriteRows(client_factory, table_name, KuduWriteOperation::Type::INSERT);
+  WriteResults write_results = GenerateWriteRows(
+      client_factory,
+      table_name,
+      FLAGS_use_upsert ? KuduWriteOperation::UPSERT : KuduWriteOperation::INSERT);
   RETURN_NOT_OK(write_results.status);
   client->SetLatestObservedTimestamp(write_results.latest_observed_timestamp);
   if (FLAGS_run_scan) {
@@ -808,8 +827,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
   }
 
   if (FLAGS_run_cleanup) {
-    RETURN_NOT_OK(
-        GenerateWriteRows(client_factory, table_name, KuduWriteOperation::Type::DELETE).status);
+    RETURN_NOT_OK(GenerateWriteRows(
+        client_factory, table_name, KuduWriteOperation::DELETE).status);
   }
 
   if (is_auto_table && !FLAGS_keep_auto_table) {
@@ -948,6 +967,7 @@ unique_ptr<Mode> BuildPerfMode() {
           .AddOptionalParameter("use_random")
           .AddOptionalParameter("use_random_pk")
           .AddOptionalParameter("use_random_non_pk")
+          .AddOptionalParameter("use_upsert")
           .Build();
 
   unique_ptr<Action> table_scan =