You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/03/03 00:55:07 UTC

[kudu] branch master updated: KUDU-2612 add perf scenario to TxnWriteOpsITest

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 740e915  KUDU-2612 add perf scenario to TxnWriteOpsITest
740e915 is described below

commit 740e9150d7072d95599951ee998df58d8ffe55b6
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Feb 26 23:43:09 2021 -0800

    KUDU-2612 add perf scenario to TxnWriteOpsITest
    
    This patch adds WriteOpPerf scenario to TxnWriteOpsITest.  The new
    scenario is to evaluate --tablet_max_pending_txn_write_ops setting for
    tablet servers: it runs for a short time to count number of completed
    Write RPCs in context of a transactional session.  The scenario focuses
    on single-row write operations to pinpoint the latency of processing
    txn write operations when performing registration of transaction
    participants.  Probably, the current locking approach for the
    TxnOpDispatcher's queue while submitting the accumulated operations
    isn't optimal.  Apparently, the exponential back-off timing built into
    the client's RPC retry logic is also an important factor: I saw
    significant deviation in the number of completed RPCs from run to run.
    
    Below are results averaged for 100 runs of the benchmark scenario with
    varying --max_pending_txn_write_ops accordingly and the following
    settings fixed:
      --prepare_connections_to_tservers=true
      --clients=8
      --sessions_per_client=1
      --benchmark_run_time_ms=50
    
    I used the following script to get the accumulated results for
    writes in a transactional context:
      for i in {0..99}; do
        ./bin/txn_write_ops-itest --gtest_filter='*WriteOpPerf' \
            --max_pending_txn_write_ops=<X> 2>&1 | grep 'write RPCs' | \
            awk '{print $9}'; done | \
            awk 'BEGIN {sum=0} {sum += $0} END {print sum}'
    
    RELEASE build:
      --max_pending_txn_write_ops=0  : 442.13 RPCs
      --max_pending_txn_write_ops=2  : 494.33 RPCs
      --max_pending_txn_write_ops=5  : 471.90 RPCs
      --max_pending_txn_write_ops=10 : 490.22 RPCs
      --max_pending_txn_write_ops=20 : 469.21 RPCs
    
    DEBUG   build:
      --max_pending_txn_write_ops=0  : 83.74 RPCs
      --max_pending_txn_write_ops=2  : 98.18 RPCs
      --max_pending_txn_write_ops=5  : 95.23 RPCs
      --max_pending_txn_write_ops=10 : 98.12 RPCs
      --max_pending_txn_write_ops=20 : 94.40 RPCs
    
    I also measured the performance of transactional vs non-transactional
    for various time intervals in RELEASE build, where for transactional
    writes I used --max_pending_txn_write_ops=2 setting:
    
    50ms:
      non-transactional:    588.33 RPCs (11767 req/sec)
          transactional:    487.82 RPCs ( 9756 req/sec)
    
    3000ms:
      non-transactional:  40041.63 RPCs (13347 req/sec)
          transactional:  39759.07 RPCs (13253 req/sec)
    
    8000ms:
      non-transactional: 106832.37 RPCs (13354 req/sec)
          transactional: 105922.65 RPCs (13240 req/sec)
    
    Change-Id: I0370dbb289a4e1cfc154205ae92e13da510682b4
    Reviewed-on: http://gerrit.cloudera.org:8080/17105
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/integration-tests/txn_write_ops-itest.cc | 191 +++++++++++++++++++++-
 1 file changed, 190 insertions(+), 1 deletion(-)

diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 0dcf553..cb7a8f0 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -26,6 +26,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <ostream>
 #include <set>
 #include <string>
@@ -36,7 +37,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -52,6 +53,7 @@
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -83,8 +85,10 @@ using kudu::KuduPartialRow;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
+using kudu::client::KuduDeleteIgnore;
 using kudu::client::KuduError;
 using kudu::client::KuduInsert;
+using kudu::client::KuduInsertIgnore;
 using kudu::client::KuduScanBatch;
 using kudu::client::KuduScanner;
 using kudu::client::KuduSchema;
@@ -114,6 +118,21 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+// The run-time flags below are for TxnWriteOpsITest.TxnWriteOpPerf scenario.
+DEFINE_bool(prime_connections_to_tservers, true,
+            "whether to open connections to tablet servers prior to sending "
+            "transactional write operations");
+DEFINE_uint32(clients, 8, "number of Kudu clients to run");
+DEFINE_uint32(sessions_per_client, 1,
+              "number of concurrent sessions per Kudu client: "
+              "there will be --clients * --sessions_per_client concurrent "
+              "writer threads in total, i.e. one writer thread per session");
+DEFINE_uint32(benchmark_run_time_ms, 50,
+              "time interval to run the benchmark, in milliseconds");
+DEFINE_uint32(max_pending_txn_write_ops, 10,
+              "setting for tserver's --tablet_max_pending_txn_write_ops flag");
+DEFINE_bool(txn_enabled, true, "whether to use transactional sessions");
+
 DECLARE_bool(tserver_txn_write_op_handling_enabled);
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
@@ -140,6 +159,20 @@ unique_ptr<KuduInsert> BuildInsert(KuduTable* table, int64_t key) {
   return op;
 }
 
+unique_ptr<KuduInsertIgnore> BuildInsertIgnore(KuduTable* table, int64_t key) {
+  unique_ptr<KuduInsertIgnore> op(table->NewInsertIgnore());
+  KuduPartialRow* row = op->mutable_row();
+  CHECK_OK(row->SetInt64(0, key));
+  return op;
+}
+
+unique_ptr<KuduDeleteIgnore> BuildDeleteIgnore(KuduTable* table, int64_t key) {
+  unique_ptr<KuduDeleteIgnore> op(table->NewDeleteIgnore());
+  KuduPartialRow* row = op->mutable_row();
+  CHECK_OK(row->SetInt64(0, key));
+  return op;
+}
+
 int64_t GetTxnId(const shared_ptr<KuduTransaction>& txn) {
   string txn_token;
   CHECK_OK(txn->Serialize(&txn_token));
@@ -418,6 +451,162 @@ TEST_F(TxnWriteOpsITest, FrequentElections) {
   ASSERT_EQ(row_count, count);
 }
 
+// This scenario runs a benchmark to measure rate of transactional write
+// operations. This is a scenario to evaluate --tablet_max_pending_txn_write_ops
+// flag setting for tablet servers.
+TEST_F(TxnWriteOpsITest, WriteOpPerf) {
+  const vector<string> ts_flags = {
+    Substitute("--tablet_max_pending_txn_write_ops=$0",
+               FLAGS_max_pending_txn_write_ops),
+  };
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
+  NO_FATALS(Prepare());
+
+  const auto num_clients = FLAGS_clients;
+  vector<shared_ptr<KuduClient>> clients;
+  clients.reserve(num_clients);
+  for (auto i = 0; i < num_clients; ++i) {
+    KuduClientBuilder b;
+    b.default_admin_operation_timeout(kTimeout);
+    b.default_rpc_timeout(kTimeout);
+    shared_ptr<KuduClient> c;
+    ASSERT_OK(cluster_->CreateClient(&b, &c));
+    clients.emplace_back(std::move(c));
+  }
+
+  const bool txn_enabled = FLAGS_txn_enabled;
+  const auto num_sessions = num_clients * FLAGS_sessions_per_client;
+  vector<shared_ptr<KuduTransaction>> txns;
+  txns.reserve(num_sessions);
+  vector<shared_ptr<KuduSession>> sessions;
+  sessions.reserve(num_sessions);
+  for (auto i = 0; i < num_sessions; ++i) {
+    const auto client_idx = i % num_clients;
+    auto& c = clients[client_idx];
+    shared_ptr<KuduSession> s;
+    if (!txn_enabled) {
+      s = c->NewSession();
+    } else {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(c->NewTransaction(&txn));
+      ASSERT_OK(txn->CreateSession(&s));
+      txns.emplace_back(std::move(txn));
+    }
+    ASSERT_NE(nullptr, s.get());
+    ASSERT_OK(s->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    sessions.emplace_back(std::move(s));
+  }
+
+  // Run multiple writer threads (one thread per session), where every thread
+  // sends as many write operations as it can. For now, using INSERT operations:
+  // INSERT and INSERT_IGNORE are the only write operations supported by
+  // multi-row transaction sessions.
+  atomic<bool> done = false;
+  Barrier barrier(num_sessions + 1);
+  vector<thread> writers;
+  writers.reserve(num_sessions);
+  vector<Status> session_statuses(num_sessions);
+  vector<size_t> row_counters(num_sessions, 0);
+  const bool prime_connections = FLAGS_prime_connections_to_tservers;
+  for (auto session_idx = 0; session_idx < num_sessions; ++session_idx) {
+    writers.emplace_back([&, session_idx] {
+      auto& session = sessions[session_idx];
+      const auto client_idx = session_idx % num_clients;
+      auto& c = clients[client_idx];
+      shared_ptr<KuduTable> table;
+      auto s = c->OpenTable(kTableName, &table);
+      if (PREDICT_FALSE(!s.ok())) {
+        session_statuses[session_idx] = s;
+        return;
+      }
+      if (prime_connections) {
+        // If requested, send several INSERT_INGORE/DELETE_IGNORE operations
+        // to open connections to all tablet servers in the cluster.
+        // Number of rows is set to have at least one row per every tablet:
+        // it's a hash-partitioned table with kNumPartitions tablets.
+        constexpr const auto kNumPreliminaryRows = kNumPartitions * 10;
+        shared_ptr<KuduSession> priming_session = c->NewSession();
+        CHECK_OK(priming_session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+        for (auto i = 0; i < kNumPreliminaryRows; ++i) {
+          unique_ptr<KuduInsertIgnore> op = BuildInsertIgnore(table.get(), i);
+          auto s = priming_session->Apply(op.release());
+          if (PREDICT_FALSE(!s.ok())) {
+            session_statuses[session_idx] = s;
+            return;
+          }
+        }
+        for (auto i = 0; i < kNumPreliminaryRows; ++i) {
+          unique_ptr<KuduDeleteIgnore> op = BuildDeleteIgnore(table.get(), i);
+          auto s = priming_session->Apply(op.release());
+          if (PREDICT_FALSE(!s.ok())) {
+            session_statuses[session_idx] = s;
+            return;
+          }
+        }
+      }
+      size_t op_idx = 0;
+      barrier.Wait();
+      while (!done) {
+        int64_t key = num_sessions * op_idx + session_idx;
+        unique_ptr<KuduInsert> op(BuildInsert(table.get(), key));
+        auto s = session->Apply(op.release());
+        if (PREDICT_FALSE(!s.ok())) {
+          session_statuses[session_idx] = s;
+          return;
+        }
+        // Every Write RPC results in one row because of AUTO_FLUSH_SYNC mode.
+        ++row_counters[session_idx];
+        ++op_idx;
+      }
+    });
+  }
+  const auto run_time = MonoDelta::FromMilliseconds(FLAGS_benchmark_run_time_ms);
+  barrier.Wait(); // start writers
+  SleepFor(run_time);
+  done = true;    // stop writers
+  std::for_each(writers.begin(), writers.end(), [](thread& t) { t.join(); });
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  for (auto i = 0; i < session_statuses.size(); ++i) {
+    SCOPED_TRACE(Substitute("session index idx $0", i));
+    const auto& s = session_statuses[i];
+    ASSERT_OK(s);
+  }
+  for (auto& txn : txns) {
+    ASSERT_OK(txn->Commit());
+  }
+  // Sanity check: make sure all the transactions are reported as complete.
+  for (auto& txn : txns) {
+    bool is_complete = false;
+    Status completion_status;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+  }
+
+  const size_t rows_total = std::accumulate(
+      row_counters.begin(), row_counters.end(), 0UL);
+  LOG(INFO) << Substitute("$0write RPCs completed: $1",
+                          txn_enabled ? "txn " : "", rows_total);
+  LOG(INFO) << Substitute(
+      "$0write RPC rate: $1 req/sec",
+      txn_enabled ? "txn " : "",
+      static_cast<double>(rows_total) / run_time.ToSeconds());
+
+  // Another sanity check: make sure all the rows have been persisted.
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(rows_total, count);
+}
+
 // Send a write operation to a tablet server in the context of non-existent
 // transaction. The server should respond back with appropriate error status.
 TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {