You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/03/03 07:32:03 UTC
[kudu] branch master updated: [tools] exit gracefully on errors during table copying
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8a58c5838 [tools] exit gracefully on errors during table copying
8a58c5838 is described below
commit 8a58c5838ec634146a0af46cdb47d4db404aef0a
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Mar 2 20:58:43 2023 -0800
[tools] exit gracefully on errors during table copying
Prior to this patch, an error to write a batch of rows into the
destination table would lead to a crash while running `kudu table copy`
CLI tool. In addition, the information on errors would not be printed
because CheckPendingErrors() was called before Session::Flush().
This patch updates addresses these issues:
* no crashes for non-OK status within TableScanner::CopyTask()
* information on errors encountered during Session::Flush()
is printed into the output stream
This is a follow-up to 28f8e972fcd042a26a502cbb1f1102c487c9398d.
Change-Id: I90dc29b5a3fb8334ec1f54425b84d13faeb19cd5
Reviewed-on: http://gerrit.cloudera.org:8080/19575
Reviewed-by: Yingchun Lai <la...@apache.org>
Tested-by: Alexey Serbin <al...@apache.org>
---
src/kudu/tools/table_scanner.cc | 78 +++++++++++++++++++++++++----------------
src/kudu/tools/table_scanner.h | 4 +--
2 files changed, 50 insertions(+), 32 deletions(-)
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 614cce862..c7ea9744c 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
+#include <functional>
#include <iomanip>
#include <iostream>
#include <iterator>
@@ -77,6 +78,7 @@ using kudu::client::KuduValue;
using kudu::client::KuduWriteOperation;
using kudu::iequals;
using std::endl;
+using std::function;
using std::map;
using std::nullopt;
using std::optional;
@@ -557,9 +559,9 @@ TableScanner::TableScanner(
CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
}
-Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
- const std::function<void(const KuduScanBatch& batch)>& cb) {
- for (auto token : tokens) {
+Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
+ const function<Status(const KuduScanBatch& batch)>& cb) {
+ for (const auto* token : tokens) {
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
@@ -577,7 +579,7 @@ Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& t
count += batch.NumRows();
total_count_ += batch.NumRows();
++next_batch_calls;
- cb(batch);
+ RETURN_NOT_OK(cb(batch));
}
sw.stop();
@@ -598,6 +600,7 @@ Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& t
}
void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread_status) {
+ DCHECK(thread_status);
*thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
if (out_ && FLAGS_show_values) {
MutexLock l(output_lock_);
@@ -606,29 +609,41 @@ void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread
}
out_->flush();
}
+ return Status::OK();
});
}
void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread_status) {
+#define TASK_RET_NOT_OK(s) do { \
+ const Status& _s = (s); \
+ if (PREDICT_FALSE(!_s.ok())) {\
+ *thread_status = _s; \
+ return; \
+ } \
+ } while (0)
+
+ DCHECK(thread_status);
client::sp::shared_ptr<KuduTable> dst_table;
- CHECK_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table));
- const KuduSchema& dst_table_schema = dst_table->schema();
+ TASK_RET_NOT_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table));
+ const auto& dst_table_schema = dst_table->schema();
// One session per thread.
client::sp::shared_ptr<KuduSession> session((*dst_client_)->NewSession());
- CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
- CHECK_OK(session->SetErrorBufferSpace(1024));
+ TASK_RET_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+ TASK_RET_NOT_OK(session->SetErrorBufferSpace(1024 * 1024));
session->SetTimeoutMillis(FLAGS_timeout_ms);
*thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
for (const auto& row : batch) {
- CHECK_OK(AddRow(dst_table, dst_table_schema, row, session));
+ RETURN_NOT_OK(AddRow(dst_table, dst_table_schema, row, session));
}
+ // Flush here to make sure all write operations have been sent.
+ auto s = session->Flush();
CheckPendingErrors(session);
- // Flush here to make sure all write operations have been sent,
- // and all strings reference to batch are still valid.
- CHECK_OK(session->Flush());
+ return s;
});
+
+#undef TASK_RET_NOT_OK
}
void TableScanner::SetOutput(ostream* out) {
@@ -650,12 +665,12 @@ void TableScanner::SetScanBatchSize(int32_t scan_batch_size) {
scan_batch_size_ = scan_batch_size;
}
-Status TableScanner::StartWork(WorkType type) {
+Status TableScanner::StartWork(WorkType work_type) {
client::sp::shared_ptr<KuduTable> src_table;
RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
// Create destination table if needed.
- if (type == WorkType::kCopy) {
+ if (work_type == WorkType::kCopy) {
RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_));
if (FLAGS_write_type.empty()) {
// Create table only.
@@ -686,7 +701,7 @@ Status TableScanner::StartWork(WorkType type) {
}
// Set projection if needed.
- if (type == WorkType::kScan) {
+ if (work_type == WorkType::kScan) {
const auto project_all = FLAGS_columns == "*" || FLAGS_columns.empty();
if (!project_all || FLAGS_row_count_only) {
vector<string> projected_column_names;
@@ -704,35 +719,36 @@ Status TableScanner::StartWork(WorkType type) {
ElementDeleter deleter(&tokens);
RETURN_NOT_OK(builder.Build(&tokens));
+ const int num_threads = FLAGS_num_threads;
+
// Set tablet filter.
const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
map<int, vector<KuduScanToken*>> thread_tokens;
int i = 0;
for (auto* token : tokens) {
if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
- thread_tokens[i++ % FLAGS_num_threads].emplace_back(token);
+ thread_tokens[i++ % num_threads].emplace_back(token);
}
}
- // Initialize statuses for each thread.
- vector<Status> thread_statuses(FLAGS_num_threads);
-
RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
- .set_max_threads(FLAGS_num_threads)
+ .set_max_threads(num_threads)
.set_idle_timeout(MonoDelta::FromMilliseconds(1))
.Build(&thread_pool_));
- Status end_status = Status::OK();
+ // Initialize statuses for each thread.
+ vector<Status> thread_statuses(num_threads);
+
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
- for (i = 0; i < FLAGS_num_threads; ++i) {
+ for (i = 0; i < num_threads; ++i) {
auto* t_tokens = &thread_tokens[i];
auto* t_status = &thread_statuses[i];
- if (type == WorkType::kScan) {
+ if (work_type == WorkType::kScan) {
RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
{ this->ScanTask(*t_tokens, t_status); }));
} else {
- CHECK(type == WorkType::kCopy);
+ DCHECK(work_type == WorkType::kCopy);
RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
{ this->CopyTask(*t_tokens, t_status); }));
}
@@ -748,18 +764,20 @@ Status TableScanner::StartWork(WorkType type) {
<< " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
}
- for (i = 0; i < FLAGS_num_threads; ++i) {
- if (!thread_statuses[i].ok()) {
+ const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying";
+ Status result_status;
+ for (const auto& s : thread_statuses) {
+ if (!s.ok()) {
if (out_) {
- *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl;
+ *out_ << operation << " failed: " << s.ToString() << endl;
}
- if (end_status.ok()) {
- end_status = thread_statuses[i];
+ if (result_status.ok()) {
+ result_status = s;
}
}
}
- return end_status;
+ return result_status;
}
Status TableScanner::StartScan() {
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 440b62c64..b8608c64b 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -90,9 +90,9 @@ class TableScanner {
const std::string& selection_str,
client::KuduClient::ReplicaSelection* selection);
- Status StartWork(WorkType type);
+ Status StartWork(WorkType work_type);
Status ScanData(const std::vector<client::KuduScanToken*>& tokens,
- const std::function<void(const client::KuduScanBatch& batch)>& cb);
+ const std::function<Status(const client::KuduScanBatch& batch)>& cb);
void ScanTask(const std::vector<client::KuduScanToken*>& tokens,
Status* thread_status);
void CopyTask(const std::vector<client::KuduScanToken*>& tokens,