You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/06/27 18:00:35 UTC

[kudu] branch master updated: KUDU-2851: modify table scan and copy tools to surface errors

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ac8462f  KUDU-2851: modify table scan and copy tools to surface errors
ac8462f is described below

commit ac8462f55715d7bf40770f6553bf52805a7d451d
Author: hannahvnguyen <ha...@cloudera.com>
AuthorDate: Tue Jun 25 17:17:11 2019 -0700

    KUDU-2851: modify table scan and copy tools to surface errors
    
    - Initialized a Status for each thread.
    - Replaced CHECKs in ScanData() with return of bad Status.
    - StartWork() logs all bad Statuses, and returns the first bad Status
      from the thread pool.
    
    Change-Id: Ic45da537b8bacfa9625010536ea82da9a6e76100
    Reviewed-on: http://gerrit.cloudera.org:8080/13733
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tools/kudu-tool-test.cc | 55 ++++++++++++++++++++++++++++++++++++++++
 src/kudu/tools/table_scanner.cc  | 40 ++++++++++++++++++++---------
 src/kudu/tools/table_scanner.h   |  8 +++---
 3 files changed, 87 insertions(+), 16 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 5ac2c4a..0732542 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -5025,6 +5025,61 @@ TEST_F(AuthzTServerChecksumTest, TestAuthorizeChecksum) {
     "--checksum_scan"
   };
   ASSERT_OK(RunKuduTool(checksum_args));
+
+}
+
+// Regression test for KUDU-2851.
+TEST_F(ToolTest, TestFailedTableScan) {
+  // Create a table using the loadgen tool.
+  const string kTableName = "db.table";
+  NO_FATALS(RunLoadgen(/*num_tservers*/1, /*tool_args*/{},kTableName));
+
+  // Now shut down the tablet servers so the scans cannot proceed.
+  // Upon running the scan tool, we should get a TimedOut status.
+  NO_FATALS(cluster_->ShutdownNodes(cluster::ClusterNodes::TS_ONLY));
+
+  // Getting an error when running the scan tool should spit out errors
+  // instead of crashing.
+  string stdout;
+  string stderr;
+  Status s = RunTool(Substitute("perf table_scan $0 $1 -num_threads=2",
+                                cluster_->master()->bound_rpc_addr().ToString(),
+                                kTableName),
+                     &stdout, &stderr, nullptr, nullptr);
+
+  ASSERT_TRUE(s.IsRuntimeError());
+  SCOPED_TRACE(stderr);
+  ASSERT_STR_CONTAINS(stderr, "Timed out");
+
+}
+
+TEST_F(ToolTest, TestFailedTableCopy) {
+  // Create a table using the loadgen tool.
+  const string kTableName = "db.table";
+  NO_FATALS(RunLoadgen(/*num_tservers*/1, /*tool_args*/{},kTableName));
+
+  // Create a destination table.
+  const string kDstTableName = "kudu.table.copy.to";
+
+  // Now shut down the tablet servers so the scans cannot proceed.
+  // Upon running the scan tool, we should get a TimedOut status.
+  NO_FATALS(cluster_->ShutdownNodes(cluster::ClusterNodes::TS_ONLY));
+
+  // Getting an error when running the copy tool should spit out errors
+  // instead of crashing.
+  string stdout;
+  string stderr;
+  Status s = RunTool(Substitute("table copy $0 $1 $2 -dst_table=$3",
+                                cluster_->master()->bound_rpc_addr().ToString(),
+                                kTableName,
+                                cluster_->master()->bound_rpc_addr().ToString(),
+                                kDstTableName),
+                     &stdout, &stderr, nullptr, nullptr);
+
+  ASSERT_TRUE(s.IsRuntimeError());
+  SCOPED_TRACE(stderr);
+  ASSERT_STR_CONTAINS(stderr, "Timed out");
+
 }
 
 } // namespace tools
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index bd87902..4924f30 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -463,21 +463,23 @@ Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
   return session->Apply(write_op.release());
 }
 
-void TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
-                            const std::function<void(const KuduScanBatch& batch)>& cb) {
+Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
+                              const std::function<void(const KuduScanBatch& batch)>& cb) {
+
   for (auto token : tokens) {
     Stopwatch sw(Stopwatch::THIS_THREAD);
     sw.start();
 
     KuduScanner* scanner_ptr;
-    CHECK_OK(token->IntoKuduScanner(&scanner_ptr));
+    RETURN_NOT_OK(token->IntoKuduScanner(&scanner_ptr));
+
     unique_ptr<KuduScanner> scanner(scanner_ptr);
-    CHECK_OK(scanner->Open());
+    RETURN_NOT_OK(scanner->Open());
 
     uint64_t count = 0;
     while (scanner->HasMoreRows()) {
       KuduScanBatch batch;
-      CHECK_OK(scanner->NextBatch(&batch));
+      RETURN_NOT_OK(scanner->NextBatch(&batch));
       count += batch.NumRows();
       total_count_.IncrementBy(batch.NumRows());
       cb(batch);
@@ -490,10 +492,13 @@ void TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tok
            << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
     }
   }
+
+  return Status::OK();
+
 }
 
-void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens) {
-  ScanData(tokens, [&](const KuduScanBatch& batch) {
+void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens, Status* thread_status) {
+  *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
     if (out_ && FLAGS_show_values) {
       MutexLock l(output_lock_);
       for (const auto& row : batch) {
@@ -503,7 +508,7 @@ void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens) {
   });
 }
 
-void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens) {
+void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread_status) {
   client::sp::shared_ptr<KuduTable> dst_table;
   CHECK_OK(dst_client_.get()->OpenTable(*dst_table_name_, &dst_table));
   const KuduSchema& dst_table_schema = dst_table->schema();
@@ -514,7 +519,7 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens) {
   CHECK_OK(session->SetErrorBufferSpace(1024));
   session->SetTimeoutMillis(30000);
 
-  ScanData(tokens, [&](const KuduScanBatch& batch) {
+  *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
     for (const auto& row : batch) {
       CHECK_OK(AddRow(dst_table, dst_table_schema, row, session));
     }
@@ -591,21 +596,25 @@ Status TableScanner::StartWork(WorkType type) {
     }
   }
 
+  // Initialize statuses for each thread.
+  vector<Status> thread_statuses(FLAGS_num_threads);
+
   RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
                   .set_max_threads(FLAGS_num_threads + 1)  // add extra 1 thread for MonitorTask
                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))
                   .Build(&thread_pool_));
 
+  Status end_status = Status::OK();
   Stopwatch sw(Stopwatch::THIS_THREAD);
   sw.start();
   for (i = 0; i < FLAGS_num_threads; ++i) {
     if (type == WorkType::kScan) {
       RETURN_NOT_OK(thread_pool_->SubmitFunc(
-        boost::bind(&TableScanner::ScanTask, this, thread_tokens[i])));
+        boost::bind(&TableScanner::ScanTask, this, thread_tokens[i], &thread_statuses[i])));
     } else {
       CHECK(type == WorkType::kCopy);
       RETURN_NOT_OK(thread_pool_->SubmitFunc(
-        boost::bind(&TableScanner::CopyTask, this, thread_tokens[i])));
+        boost::bind(&TableScanner::CopyTask, this, thread_tokens[i], &thread_statuses[i])));
     }
   }
   RETURN_NOT_OK(thread_pool_->SubmitFunc(boost::bind(&TableScanner::MonitorTask, this)));
@@ -618,7 +627,14 @@ Status TableScanner::StartWork(WorkType type) {
         << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
   }
 
-  return Status::OK();
+  for (i = 0; i < FLAGS_num_threads; ++i) {
+    if (!thread_statuses[i].ok()) {
+      if (out_) *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl;
+      if (end_status.ok()) end_status = thread_statuses[i];
+    }
+  }
+
+  return end_status;
 }
 
 Status TableScanner::StartScan() {
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 7b3377b..17c75d2 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -80,10 +80,10 @@ class TableScanner {
   };
 
   Status StartWork(WorkType type);
-  void ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
-                const std::function<void(const kudu::client::KuduScanBatch& batch)>& cb);
-  void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens);
-  void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens);
+  Status ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
+                  const std::function<void(const kudu::client::KuduScanBatch& batch)>& cb);
+  void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens, Status* thread_status);
+  void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens, Status* thread_status);
   void MonitorTask();
 
   Status AddRow(const client::sp::shared_ptr<kudu::client::KuduTable>& table,