You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/09/03 04:42:13 UTC
[kudu] branch master updated: [tools] Add --fault_tolerant to some CLI tools
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new d4c95f375 [tools] Add --fault_tolerant to some CLI tools
d4c95f375 is described below
commit d4c95f37531c56b2af3250c763c0b74fd49987c9
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed Aug 3 19:08:48 2022 +0800
[tools] Add --fault_tolerant to some CLI tools
Add --fault_tolerant to 'kudu table copy/scan' and
'kudu perf table_scan' to make the scanner
fault-tolerant and the results returned in primary
key order per-tablet. This patch also adds some other
flags missing in 'kudu table copy'.
Change-Id: I1a05792db9dceb5e774ce1602158bb636bba04d0
Reviewed-on: http://gerrit.cloudera.org:8080/18813
Reviewed-by: Yifan Zhang <ch...@163.com>
Tested-by: Alexey Serbin <al...@apache.org>
Reviewed-by: Alexey Serbin <al...@apache.org>
---
src/kudu/client/scan_configuration.cc | 2 ++
src/kudu/tools/kudu-tool-test.cc | 67 +++++++++++++++++++++++++++++++++++
src/kudu/tools/table_scanner.cc | 15 +++++++-
src/kudu/tools/tool_action_perf.cc | 9 ++++-
src/kudu/tools/tool_action_table.cc | 6 ++++
5 files changed, 97 insertions(+), 2 deletions(-)
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index 1d5968a2a..edf0921f9 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -168,6 +168,8 @@ Status ScanConfiguration::SetReadMode(KuduScanner::ReadMode read_mode) {
Status ScanConfiguration::SetFaultTolerant(bool fault_tolerant) {
if (fault_tolerant) {
+ // TODO(yingchun): this will overwrite the user set read mode, maybe it
+ // should return error if there is any conflict.
RETURN_NOT_OK(SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
}
is_fault_tolerant_ = fault_tolerant;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index a0c89b40c..ba062551b 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3694,6 +3694,28 @@ TEST_F(ToolTest, PerfTableScanReplicaSelection) {
}
}
+TEST_F(ToolTest, PerfTableScanFaultTolerant) {
+ constexpr const char* const kTableName = "perf.table_scan.fault_tolerant";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=8",
+ "--num_rows_per_thread=111",
+ },
+ kTableName));
+ for (const auto& flag : {"true", "false"}) {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("perf table_scan $0 $1 --fault_tolerant=$2",
+ cluster_->master()->bound_rpc_addr().ToString(), kTableName, flag),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_EQ(1, out_lines.size()) << out;
+ ASSERT_STR_CONTAINS(out, "Total count 888 ");
+ }
+}
+
TEST_F(ToolTest, PerfTableScanBatchSize) {
constexpr const char* const kTableName = "perf.table_scan.batch_size";
NO_FATALS(RunLoadgen(1,
@@ -5030,6 +5052,51 @@ TEST_F(ToolTest, TableScanCustomBatchSize) {
}
}
+TEST_F(ToolTest, TableScanFaultTolerant) {
+ constexpr const char* const kTableName = "kudu.table.scan.fault_tolerant";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=8",
+ "--num_rows_per_thread=111",
+ },
+ kTableName));
+ for (const auto& flag : {"true", "false"}) {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("table scan $0 $1 --fault_tolerant=$2",
+ cluster_->master()->bound_rpc_addr().ToString(), kTableName, flag),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "Total count 888 ");
+ }
+}
+
+TEST_F(ToolTest, TableCopyFaultTolerant) {
+ constexpr const char* const kTableName = "kudu.table.copy.fault_tolerant.from";
+ constexpr const char* const kNewTableName = "kudu.table.copy.fault_tolerant.to";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=8",
+ "--num_rows_per_thread=111",
+ },
+ kTableName));
+ for (const auto& flag : {"true", "false"}) {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("table copy $0 $1 $2 --dst_table=$3 --write_type=upsert --fault_tolerant=$4",
+ cluster_->master()->bound_rpc_addr().ToString(), kTableName,
+ cluster_->master()->bound_rpc_addr().ToString(), kNewTableName,
+ flag),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "Total count 888 ");
+ }
+}
+
TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
for (const auto& arg : GenerateArgs()) {
NO_FATALS(RunCopyTableCheck(arg));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index f60c3a386..5ed43d0f6 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -24,10 +24,11 @@
#include <iomanip>
#include <iostream>
#include <iterator>
-#include <optional>
#include <map>
#include <memory>
+#include <optional>
#include <set>
+#include <type_traits>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -98,6 +99,10 @@ DEFINE_string(create_table_hash_bucket_nums, "",
"The number of hash buckets in each hash dimension seperated by comma");
DEFINE_bool(fill_cache, true,
"Whether to fill block cache when scanning.");
+DEFINE_bool(fault_tolerant, false,
+ "Whether to make scans resumable at another tablet server if current server fails. "
+ "Fault-tolerant scans typically have lower throughput than non fault-tolerant scans, "
+ "but the results are returned in primary key order for a single tablet.");
DEFINE_string(predicates, "",
"Query predicates on columns. Unlike traditional SQL syntax, "
"the scan tool's simple query predicates are represented in a "
@@ -668,6 +673,14 @@ Status TableScanner::StartWork(WorkType type) {
}
RETURN_NOT_OK(builder.SetSelection(replica_selection_));
RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
+ if (FLAGS_fault_tolerant) {
+ // TODO(yingchun): push down this judgement to ScanConfiguration::SetFaultTolerant
+ if (mode_ && *mode_ != KuduScanner::READ_AT_SNAPSHOT) {
+ return Status::InvalidArgument(Substitute("--fault_tolerant conflicts with "
+ "the non-READ_AT_SNAPSHOT read mode"));
+ }
+ RETURN_NOT_OK(builder.SetFaultTolerant());
+ }
// Set projection if needed.
if (type == WorkType::kScan) {
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 9ff420a3a..09cf70ac5 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -175,6 +175,7 @@
#include <optional>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_map>
#include <vector>
@@ -207,7 +208,6 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/rowset.h"
-#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
@@ -224,6 +224,12 @@
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
+namespace kudu {
+namespace tablet {
+class Tablet;
+} // namespace tablet
+} // namespace kudu
+
using kudu::ColumnSchema;
using kudu::KuduPartialRow;
using kudu::Stopwatch;
@@ -1073,6 +1079,7 @@ unique_ptr<Mode> BuildPerfMode() {
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
.AddOptionalParameter("scan_batch_size")
+ .AddOptionalParameter("fault_tolerant")
.AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 7bfe717bd..fa683e8b4 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -175,6 +175,7 @@ DEFINE_uint32(reserve_seconds, 604800,
"Reserve seconds before purging a soft-deleted table.");
DECLARE_bool(create_table);
+DECLARE_bool(fault_tolerant);
DECLARE_int32(create_table_replication_factor);
DECLARE_bool(row_count_only);
DECLARE_bool(show_scanner_stats);
@@ -813,6 +814,7 @@ Status CopyTable(const RunnerContext& context) {
TableScanner scanner(src_client, src_table_name, dst_client, dst_table_name);
scanner.SetOutput(&cout);
+ scanner.SetScanBatchSize(FLAGS_scan_batch_size);
return scanner.StartCopy();
}
@@ -1784,6 +1786,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
.AddOptionalParameter("scan_batch_size")
+ .AddOptionalParameter("fault_tolerant")
.AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
@@ -1805,8 +1808,11 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("create_table_hash_bucket_nums")
.AddOptionalParameter("create_table_replication_factor")
.AddOptionalParameter("dst_table")
+ .AddOptionalParameter("fault_tolerant")
+ .AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
+ .AddOptionalParameter("scan_batch_size")
.AddOptionalParameter("tablets")
.AddOptionalParameter("write_type")
.Build();