You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/06/27 18:00:35 UTC
[kudu] branch master updated: KUDU-2851: modify table scan and copy
tools to surface errors
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ac8462f KUDU-2851: modify table scan and copy tools to surface errors
ac8462f is described below
commit ac8462f55715d7bf40770f6553bf52805a7d451d
Author: hannahvnguyen <ha...@cloudera.com>
AuthorDate: Tue Jun 25 17:17:11 2019 -0700
KUDU-2851: modify table scan and copy tools to surface errors
- Initialized a Status for each thread.
- Replaced CHECKs in ScanData() with return of bad Status.
- StartWork() logs all bad Statuses, and returns the first bad Status
from the thread pool.
Change-Id: Ic45da537b8bacfa9625010536ea82da9a6e76100
Reviewed-on: http://gerrit.cloudera.org:8080/13733
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/tools/kudu-tool-test.cc | 55 ++++++++++++++++++++++++++++++++++++++++
src/kudu/tools/table_scanner.cc | 40 ++++++++++++++++++++---------
src/kudu/tools/table_scanner.h | 8 +++---
3 files changed, 87 insertions(+), 16 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 5ac2c4a..0732542 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -5025,6 +5025,61 @@ TEST_F(AuthzTServerChecksumTest, TestAuthorizeChecksum) {
"--checksum_scan"
};
ASSERT_OK(RunKuduTool(checksum_args));
+
+}
+
+// Regression test for KUDU-2851.
+TEST_F(ToolTest, TestFailedTableScan) {
+ // Create a table using the loadgen tool.
+ const string kTableName = "db.table";
+ NO_FATALS(RunLoadgen(/*num_tservers*/1, /*tool_args*/{},kTableName));
+
+ // Now shut down the tablet servers so the scans cannot proceed.
+ // Upon running the scan tool, we should get a TimedOut status.
+ NO_FATALS(cluster_->ShutdownNodes(cluster::ClusterNodes::TS_ONLY));
+
+ // Getting an error when running the scan tool should spit out errors
+ // instead of crashing.
+ string stdout;
+ string stderr;
+ Status s = RunTool(Substitute("perf table_scan $0 $1 -num_threads=2",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kTableName),
+ &stdout, &stderr, nullptr, nullptr);
+
+ ASSERT_TRUE(s.IsRuntimeError());
+ SCOPED_TRACE(stderr);
+ ASSERT_STR_CONTAINS(stderr, "Timed out");
+
+}
+
+TEST_F(ToolTest, TestFailedTableCopy) {
+ // Create a table using the loadgen tool.
+ const string kTableName = "db.table";
+ NO_FATALS(RunLoadgen(/*num_tservers*/1, /*tool_args*/{},kTableName));
+
+ // Create a destination table.
+ const string kDstTableName = "kudu.table.copy.to";
+
+ // Now shut down the tablet servers so the scans cannot proceed.
+ // Upon running the scan tool, we should get a TimedOut status.
+ NO_FATALS(cluster_->ShutdownNodes(cluster::ClusterNodes::TS_ONLY));
+
+ // Getting an error when running the copy tool should spit out errors
+ // instead of crashing.
+ string stdout;
+ string stderr;
+ Status s = RunTool(Substitute("table copy $0 $1 $2 -dst_table=$3",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kTableName,
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kDstTableName),
+ &stdout, &stderr, nullptr, nullptr);
+
+ ASSERT_TRUE(s.IsRuntimeError());
+ SCOPED_TRACE(stderr);
+ ASSERT_STR_CONTAINS(stderr, "Timed out");
+
}
} // namespace tools
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index bd87902..4924f30 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -463,21 +463,23 @@ Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
return session->Apply(write_op.release());
}
-void TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
- const std::function<void(const KuduScanBatch& batch)>& cb) {
+Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
+ const std::function<void(const KuduScanBatch& batch)>& cb) {
+
for (auto token : tokens) {
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
KuduScanner* scanner_ptr;
- CHECK_OK(token->IntoKuduScanner(&scanner_ptr));
+ RETURN_NOT_OK(token->IntoKuduScanner(&scanner_ptr));
+
unique_ptr<KuduScanner> scanner(scanner_ptr);
- CHECK_OK(scanner->Open());
+ RETURN_NOT_OK(scanner->Open());
uint64_t count = 0;
while (scanner->HasMoreRows()) {
KuduScanBatch batch;
- CHECK_OK(scanner->NextBatch(&batch));
+ RETURN_NOT_OK(scanner->NextBatch(&batch));
count += batch.NumRows();
total_count_.IncrementBy(batch.NumRows());
cb(batch);
@@ -490,10 +492,13 @@ void TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tok
<< " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
}
}
+
+ return Status::OK();
+
}
-void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens) {
- ScanData(tokens, [&](const KuduScanBatch& batch) {
+void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens, Status* thread_status) {
+ *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
if (out_ && FLAGS_show_values) {
MutexLock l(output_lock_);
for (const auto& row : batch) {
@@ -503,7 +508,7 @@ void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens) {
});
}
-void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens) {
+void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread_status) {
client::sp::shared_ptr<KuduTable> dst_table;
CHECK_OK(dst_client_.get()->OpenTable(*dst_table_name_, &dst_table));
const KuduSchema& dst_table_schema = dst_table->schema();
@@ -514,7 +519,7 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens) {
CHECK_OK(session->SetErrorBufferSpace(1024));
session->SetTimeoutMillis(30000);
- ScanData(tokens, [&](const KuduScanBatch& batch) {
+ *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
for (const auto& row : batch) {
CHECK_OK(AddRow(dst_table, dst_table_schema, row, session));
}
@@ -591,21 +596,25 @@ Status TableScanner::StartWork(WorkType type) {
}
}
+ // Initialize statuses for each thread.
+ vector<Status> thread_statuses(FLAGS_num_threads);
+
RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
.set_max_threads(FLAGS_num_threads + 1) // add extra 1 thread for MonitorTask
.set_idle_timeout(MonoDelta::FromMilliseconds(1))
.Build(&thread_pool_));
+ Status end_status = Status::OK();
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
for (i = 0; i < FLAGS_num_threads; ++i) {
if (type == WorkType::kScan) {
RETURN_NOT_OK(thread_pool_->SubmitFunc(
- boost::bind(&TableScanner::ScanTask, this, thread_tokens[i])));
+ boost::bind(&TableScanner::ScanTask, this, thread_tokens[i], &thread_statuses[i])));
} else {
CHECK(type == WorkType::kCopy);
RETURN_NOT_OK(thread_pool_->SubmitFunc(
- boost::bind(&TableScanner::CopyTask, this, thread_tokens[i])));
+ boost::bind(&TableScanner::CopyTask, this, thread_tokens[i], &thread_statuses[i])));
}
}
RETURN_NOT_OK(thread_pool_->SubmitFunc(boost::bind(&TableScanner::MonitorTask, this)));
@@ -618,7 +627,14 @@ Status TableScanner::StartWork(WorkType type) {
<< " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
}
- return Status::OK();
+ for (i = 0; i < FLAGS_num_threads; ++i) {
+ if (!thread_statuses[i].ok()) {
+ if (out_) *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl;
+ if (end_status.ok()) end_status = thread_statuses[i];
+ }
+ }
+
+ return end_status;
}
Status TableScanner::StartScan() {
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 7b3377b..17c75d2 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -80,10 +80,10 @@ class TableScanner {
};
Status StartWork(WorkType type);
- void ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
- const std::function<void(const kudu::client::KuduScanBatch& batch)>& cb);
- void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens);
- void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens);
+ Status ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
+ const std::function<void(const kudu::client::KuduScanBatch& batch)>& cb);
+ void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens, Status* thread_status);
+ void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens, Status* thread_status);
void MonitorTask();
Status AddRow(const client::sp::shared_ptr<kudu::client::KuduTable>& table,