You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/03/03 07:32:03 UTC

[kudu] branch master updated: [tools] exit gracefully on errors during table copying

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a58c5838 [tools] exit gracefully on errors during table copying
8a58c5838 is described below

commit 8a58c5838ec634146a0af46cdb47d4db404aef0a
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Mar 2 20:58:43 2023 -0800

    [tools] exit gracefully on errors during table copying
    
    Prior to this patch, an error to write a batch of rows into the
    destination table would lead to a crash while running `kudu table copy`
    CLI tool.  In addition, the information on errors would not be printed
    because CheckPendingErrors() was called before Session::Flush().
    
    This patch updates addresses these issues:
      * no crashes for non-OK status within TableScanner::CopyTask()
      * information on errors encountered during Session::Flush()
        is printed into the output stream
    
    This is a follow-up to 28f8e972fcd042a26a502cbb1f1102c487c9398d.
    
    Change-Id: I90dc29b5a3fb8334ec1f54425b84d13faeb19cd5
    Reviewed-on: http://gerrit.cloudera.org:8080/19575
    Reviewed-by: Yingchun Lai <la...@apache.org>
    Tested-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/tools/table_scanner.cc | 78 +++++++++++++++++++++++++----------------
 src/kudu/tools/table_scanner.h  |  4 +--
 2 files changed, 50 insertions(+), 32 deletions(-)

diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 614cce862..c7ea9744c 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
+#include <functional>
 #include <iomanip>
 #include <iostream>
 #include <iterator>
@@ -77,6 +78,7 @@ using kudu::client::KuduValue;
 using kudu::client::KuduWriteOperation;
 using kudu::iequals;
 using std::endl;
+using std::function;
 using std::map;
 using std::nullopt;
 using std::optional;
@@ -557,9 +559,9 @@ TableScanner::TableScanner(
   CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
 }
 
-Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
-                              const std::function<void(const KuduScanBatch& batch)>& cb) {
-  for (auto token : tokens) {
+Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
+                              const function<Status(const KuduScanBatch& batch)>& cb) {
+  for (const auto* token : tokens) {
     Stopwatch sw(Stopwatch::THIS_THREAD);
     sw.start();
 
@@ -577,7 +579,7 @@ Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& t
       count += batch.NumRows();
       total_count_ += batch.NumRows();
       ++next_batch_calls;
-      cb(batch);
+      RETURN_NOT_OK(cb(batch));
     }
     sw.stop();
 
@@ -598,6 +600,7 @@ Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& t
 }
 
 void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread_status) {
+  DCHECK(thread_status);
   *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
     if (out_ && FLAGS_show_values) {
       MutexLock l(output_lock_);
@@ -606,29 +609,41 @@ void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread
       }
       out_->flush();
     }
+    return Status::OK();
   });
 }
 
 void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread_status) {
+#define TASK_RET_NOT_OK(s) do {   \
+    const Status& _s = (s);       \
+    if (PREDICT_FALSE(!_s.ok())) {\
+      *thread_status = _s;        \
+      return;                     \
+    }                             \
+  } while (0)
+
+  DCHECK(thread_status);
   client::sp::shared_ptr<KuduTable> dst_table;
-  CHECK_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table));
-  const KuduSchema& dst_table_schema = dst_table->schema();
+  TASK_RET_NOT_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table));
+  const auto& dst_table_schema = dst_table->schema();
 
   // One session per thread.
   client::sp::shared_ptr<KuduSession> session((*dst_client_)->NewSession());
-  CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
-  CHECK_OK(session->SetErrorBufferSpace(1024));
+  TASK_RET_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+  TASK_RET_NOT_OK(session->SetErrorBufferSpace(1024 * 1024));
   session->SetTimeoutMillis(FLAGS_timeout_ms);
 
   *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
     for (const auto& row : batch) {
-      CHECK_OK(AddRow(dst_table, dst_table_schema, row, session));
+      RETURN_NOT_OK(AddRow(dst_table, dst_table_schema, row, session));
     }
+    // Flush here to make sure all write operations have been sent.
+    auto s = session->Flush();
     CheckPendingErrors(session);
-    // Flush here to make sure all write operations have been sent,
-    // and all strings reference to batch are still valid.
-    CHECK_OK(session->Flush());
+    return s;
   });
+
+#undef TASK_RET_NOT_OK
 }
 
 void TableScanner::SetOutput(ostream* out) {
@@ -650,12 +665,12 @@ void TableScanner::SetScanBatchSize(int32_t scan_batch_size) {
   scan_batch_size_ = scan_batch_size;
 }
 
-Status TableScanner::StartWork(WorkType type) {
+Status TableScanner::StartWork(WorkType work_type) {
   client::sp::shared_ptr<KuduTable> src_table;
   RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
 
   // Create destination table if needed.
-  if (type == WorkType::kCopy) {
+  if (work_type == WorkType::kCopy) {
     RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_, *dst_table_name_));
     if (FLAGS_write_type.empty()) {
       // Create table only.
@@ -686,7 +701,7 @@ Status TableScanner::StartWork(WorkType type) {
   }
 
   // Set projection if needed.
-  if (type == WorkType::kScan) {
+  if (work_type == WorkType::kScan) {
     const auto project_all = FLAGS_columns == "*" || FLAGS_columns.empty();
     if (!project_all || FLAGS_row_count_only) {
       vector<string> projected_column_names;
@@ -704,35 +719,36 @@ Status TableScanner::StartWork(WorkType type) {
   ElementDeleter deleter(&tokens);
   RETURN_NOT_OK(builder.Build(&tokens));
 
+  const int num_threads = FLAGS_num_threads;
+
   // Set tablet filter.
   const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
   map<int, vector<KuduScanToken*>> thread_tokens;
   int i = 0;
   for (auto* token : tokens) {
     if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
-      thread_tokens[i++ % FLAGS_num_threads].emplace_back(token);
+      thread_tokens[i++ % num_threads].emplace_back(token);
     }
   }
 
-  // Initialize statuses for each thread.
-  vector<Status> thread_statuses(FLAGS_num_threads);
-
   RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
-                  .set_max_threads(FLAGS_num_threads)
+                  .set_max_threads(num_threads)
                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))
                   .Build(&thread_pool_));
 
-  Status end_status = Status::OK();
+  // Initialize statuses for each thread.
+  vector<Status> thread_statuses(num_threads);
+
   Stopwatch sw(Stopwatch::THIS_THREAD);
   sw.start();
-  for (i = 0; i < FLAGS_num_threads; ++i) {
+  for (i = 0; i < num_threads; ++i) {
     auto* t_tokens = &thread_tokens[i];
     auto* t_status = &thread_statuses[i];
-    if (type == WorkType::kScan) {
+    if (work_type == WorkType::kScan) {
       RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
                                          { this->ScanTask(*t_tokens, t_status); }));
     } else {
-      CHECK(type == WorkType::kCopy);
+      DCHECK(work_type == WorkType::kCopy);
       RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
                                          { this->CopyTask(*t_tokens, t_status); }));
     }
@@ -748,18 +764,20 @@ Status TableScanner::StartWork(WorkType type) {
         << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
   }
 
-  for (i = 0; i < FLAGS_num_threads; ++i) {
-    if (!thread_statuses[i].ok()) {
+  const auto& operation = work_type == WorkType::kScan ? "Scanning" : "Copying";
+  Status result_status;
+  for (const auto& s : thread_statuses) {
+    if (!s.ok()) {
       if (out_) {
-        *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl;
+        *out_ << operation << " failed: " << s.ToString() << endl;
       }
-      if (end_status.ok()) {
-        end_status = thread_statuses[i];
+      if (result_status.ok()) {
+        result_status = s;
       }
     }
   }
 
-  return end_status;
+  return result_status;
 }
 
 Status TableScanner::StartScan() {
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 440b62c64..b8608c64b 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -90,9 +90,9 @@ class TableScanner {
       const std::string& selection_str,
       client::KuduClient::ReplicaSelection* selection);
 
-  Status StartWork(WorkType type);
+  Status StartWork(WorkType work_type);
   Status ScanData(const std::vector<client::KuduScanToken*>& tokens,
-                  const std::function<void(const client::KuduScanBatch& batch)>& cb);
+                  const std::function<Status(const client::KuduScanBatch& batch)>& cb);
   void ScanTask(const std::vector<client::KuduScanToken*>& tokens,
                 Status* thread_status);
   void CopyTask(const std::vector<client::KuduScanToken*>& tokens,