You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/09/03 04:42:13 UTC

[kudu] branch master updated: [tools] Add --fault_tolerant to some CLI tools

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d4c95f375 [tools] Add --fault_tolerant to some CLI tools
d4c95f375 is described below

commit d4c95f37531c56b2af3250c763c0b74fd49987c9
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed Aug 3 19:08:48 2022 +0800

    [tools] Add --fault_tolerant to some CLI tools
    
    Add --fault_tolerant to 'kudu table copy/scan' and
    'kudu perf table_scan' to make the scanner
    fault-tolerant and the results returned in primary
    key order per-tablet. This patch also adds some other
    flags missing in 'kudu table copy'.
    
    Change-Id: I1a05792db9dceb5e774ce1602158bb636bba04d0
    Reviewed-on: http://gerrit.cloudera.org:8080/18813
    Reviewed-by: Yifan Zhang <ch...@163.com>
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/client/scan_configuration.cc |  2 ++
 src/kudu/tools/kudu-tool-test.cc      | 67 +++++++++++++++++++++++++++++++++++
 src/kudu/tools/table_scanner.cc       | 15 +++++++-
 src/kudu/tools/tool_action_perf.cc    |  9 ++++-
 src/kudu/tools/tool_action_table.cc   |  6 ++++
 5 files changed, 97 insertions(+), 2 deletions(-)

diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index 1d5968a2a..edf0921f9 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -168,6 +168,8 @@ Status ScanConfiguration::SetReadMode(KuduScanner::ReadMode read_mode) {
 
 Status ScanConfiguration::SetFaultTolerant(bool fault_tolerant) {
   if (fault_tolerant) {
+    // TODO(yingchun): this will overwrite the user set read mode, maybe it
+    // should return error if there is any conflict.
     RETURN_NOT_OK(SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
   }
   is_fault_tolerant_ = fault_tolerant;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index a0c89b40c..ba062551b 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3694,6 +3694,28 @@ TEST_F(ToolTest, PerfTableScanReplicaSelection) {
   }
 }
 
+TEST_F(ToolTest, PerfTableScanFaultTolerant) {
+  constexpr const char* const kTableName = "perf.table_scan.fault_tolerant";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                           "--num_threads=8",
+                           "--num_rows_per_thread=111",
+                       },
+                       kTableName));
+  for (const auto& flag : {"true", "false"}) {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("perf table_scan $0 $1 --fault_tolerant=$2",
+                   cluster_->master()->bound_rpc_addr().ToString(), kTableName, flag),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_EQ(1, out_lines.size()) << out;
+    ASSERT_STR_CONTAINS(out, "Total count 888 ");
+  }
+}
+
 TEST_F(ToolTest, PerfTableScanBatchSize) {
   constexpr const char* const kTableName = "perf.table_scan.batch_size";
   NO_FATALS(RunLoadgen(1,
@@ -5030,6 +5052,51 @@ TEST_F(ToolTest, TableScanCustomBatchSize) {
   }
 }
 
+TEST_F(ToolTest, TableScanFaultTolerant) {
+  constexpr const char* const kTableName = "kudu.table.scan.fault_tolerant";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                           "--num_threads=8",
+                           "--num_rows_per_thread=111",
+                       },
+                       kTableName));
+  for (const auto& flag : {"true", "false"}) {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("table scan $0 $1 --fault_tolerant=$2",
+                   cluster_->master()->bound_rpc_addr().ToString(), kTableName, flag),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "Total count 888 ");
+  }
+}
+
+TEST_F(ToolTest, TableCopyFaultTolerant) {
+  constexpr const char* const kTableName = "kudu.table.copy.fault_tolerant.from";
+  constexpr const char* const kNewTableName = "kudu.table.copy.fault_tolerant.to";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                           "--num_threads=8",
+                           "--num_rows_per_thread=111",
+                       },
+                       kTableName));
+  for (const auto& flag : {"true", "false"}) {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("table copy $0 $1 $2 --dst_table=$3 --write_type=upsert --fault_tolerant=$4",
+                   cluster_->master()->bound_rpc_addr().ToString(), kTableName,
+                   cluster_->master()->bound_rpc_addr().ToString(), kNewTableName,
+                   flag),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "Total count 888 ");
+  }
+}
+
 TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
   for (const auto& arg : GenerateArgs()) {
     NO_FATALS(RunCopyTableCheck(arg));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index f60c3a386..5ed43d0f6 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -24,10 +24,11 @@
 #include <iomanip>
 #include <iostream>
 #include <iterator>
-#include <optional>
 #include <map>
 #include <memory>
+#include <optional>
 #include <set>
+#include <type_traits>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -98,6 +99,10 @@ DEFINE_string(create_table_hash_bucket_nums, "",
               "The number of hash buckets in each hash dimension seperated by comma");
 DEFINE_bool(fill_cache, true,
             "Whether to fill block cache when scanning.");
+DEFINE_bool(fault_tolerant, false,
+            "Whether to make scans resumable at another tablet server if current server fails. "
+            "Fault-tolerant scans typically have lower throughput than non fault-tolerant scans, "
+            "but the results are returned in primary key order for a single tablet.");
 DEFINE_string(predicates, "",
               "Query predicates on columns. Unlike traditional SQL syntax, "
               "the scan tool's simple query predicates are represented in a "
@@ -668,6 +673,14 @@ Status TableScanner::StartWork(WorkType type) {
   }
   RETURN_NOT_OK(builder.SetSelection(replica_selection_));
   RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
+  if (FLAGS_fault_tolerant) {
+    // TODO(yingchun): push down this judgement to ScanConfiguration::SetFaultTolerant
+    if (mode_ && *mode_ != KuduScanner::READ_AT_SNAPSHOT) {
+      return Status::InvalidArgument(Substitute("--fault_tolerant conflicts with "
+          "the non-READ_AT_SNAPSHOT read mode"));
+    }
+    RETURN_NOT_OK(builder.SetFaultTolerant());
+  }
 
   // Set projection if needed.
   if (type == WorkType::kScan) {
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 9ff420a3a..09cf70ac5 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -175,6 +175,7 @@
 #include <optional>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_map>
 #include <vector>
 
@@ -207,7 +208,6 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/rowset.h"
-#include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -224,6 +224,12 @@
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 
+namespace kudu {
+namespace tablet {
+class Tablet;
+}  // namespace tablet
+}  // namespace kudu
+
 using kudu::ColumnSchema;
 using kudu::KuduPartialRow;
 using kudu::Stopwatch;
@@ -1073,6 +1079,7 @@ unique_ptr<Mode> BuildPerfMode() {
       .AddOptionalParameter("row_count_only")
       .AddOptionalParameter("report_scanner_stats")
       .AddOptionalParameter("scan_batch_size")
+      .AddOptionalParameter("fault_tolerant")
       .AddOptionalParameter("fill_cache")
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 7bfe717bd..fa683e8b4 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -175,6 +175,7 @@ DEFINE_uint32(reserve_seconds, 604800,
               "Reserve seconds before purging a soft-deleted table.");
 
 DECLARE_bool(create_table);
+DECLARE_bool(fault_tolerant);
 DECLARE_int32(create_table_replication_factor);
 DECLARE_bool(row_count_only);
 DECLARE_bool(show_scanner_stats);
@@ -813,6 +814,7 @@ Status CopyTable(const RunnerContext& context) {
 
   TableScanner scanner(src_client, src_table_name, dst_client, dst_table_name);
   scanner.SetOutput(&cout);
+  scanner.SetScanBatchSize(FLAGS_scan_batch_size);
   return scanner.StartCopy();
 }
 
@@ -1784,6 +1786,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("row_count_only")
       .AddOptionalParameter("report_scanner_stats")
       .AddOptionalParameter("scan_batch_size")
+      .AddOptionalParameter("fault_tolerant")
       .AddOptionalParameter("fill_cache")
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")
@@ -1805,8 +1808,11 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("create_table_hash_bucket_nums")
       .AddOptionalParameter("create_table_replication_factor")
       .AddOptionalParameter("dst_table")
+      .AddOptionalParameter("fault_tolerant")
+      .AddOptionalParameter("fill_cache")
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")
+      .AddOptionalParameter("scan_batch_size")
       .AddOptionalParameter("tablets")
       .AddOptionalParameter("write_type")
       .Build();