You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/04/06 02:15:56 UTC

[kudu] branch branch-1.17.x updated: [tools] update 'kudu table copy' CLI tool

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.17.x by this push:
     new 261ce7daa [tools] update 'kudu table copy' CLI tool
261ce7daa is described below

commit 261ce7daa83a716c2ad9b960b9c904f11bdb7a68
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Mar 6 19:09:34 2023 -0800

    [tools] update 'kudu table copy' CLI tool
    
    This patch adds 'insert_ignore' and 'upsert_ignore' write operations
    for the 'kudu table copy' CLI tool.  It also updates the implementation
    of TableScanner::CopyTask() and TableScanner::AddRow() to perform less
    string comparisons.
    
    Change-Id: I8061f510710a30019db62627c91fb4caf4a13d27
    Reviewed-on: http://gerrit.cloudera.org:8080/19595
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <la...@apache.org>
    Reviewed-on: http://gerrit.cloudera.org:8080/19693
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/tools/kudu-tool-test.cc | 91 +++++++++++++++++++++++++++----------
 src/kudu/tools/table_scanner.cc  | 97 ++++++++++++++++++++++++++++++----------
 src/kudu/tools/table_scanner.h   | 17 +++----
 3 files changed, 146 insertions(+), 59 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 7714dcad6..92719b6f1 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -514,10 +514,12 @@ class ToolTest : public KuduTest {
   }
 
   enum class TableCopyMode {
-    INSERT_TO_EXIST_TABLE = 0,
-    INSERT_TO_NOT_EXIST_TABLE = 1,
-    UPSERT_TO_EXIST_TABLE = 2,
+    INSERT_TO_EXISTING_TABLE = 0,
+    INSERT_TO_NEW_TABLE = 1,
+    UPSERT_TO_EXISTING_TABLE = 2,
     COPY_SCHEMA_ONLY = 3,
+    INSERT_IGNORE_TO_EXISTING_TABLE = 4,
+    UPSERT_IGNORE_TO_EXISTING_TABLE = 5,
   };
 
   struct RunCopyTableCheckArgs {
@@ -532,7 +534,7 @@ class ToolTest : public KuduTest {
   };
 
   void RunCopyTableCheck(const RunCopyTableCheckArgs& args) {
-    const string kDstTableName = "kudu.table.copy.to";
+    static constexpr const char* const kDstTableName = "kudu.table.copy.to";
 
     // Prepare command flags, create destination table and write some data if needed.
     string write_type;
@@ -541,24 +543,49 @@ class ToolTest : public KuduTest {
     ww.set_table_name(kDstTableName);
     ww.set_num_replicas(1);
     switch (args.mode) {
-      case TableCopyMode::INSERT_TO_EXIST_TABLE:
+      case TableCopyMode::INSERT_TO_EXISTING_TABLE:
         write_type = "insert";
         create_table = "false";
         // Create the dst table.
         ww.set_num_write_threads(0);
         ww.Setup();
         break;
-      case TableCopyMode::INSERT_TO_NOT_EXIST_TABLE:
+      case TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE:
+        write_type = "insert_ignore";
+        create_table = "false";
+        // Create the dst table and write some data to it.
+        ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+        ww.set_num_write_threads(1);
+        ww.Setup();
+        ww.Start();
+        ASSERT_EVENTUALLY([&]() {
+          ASSERT_GE(ww.rows_inserted(), 100);
+        });
+        ww.StopAndJoin();
+        break;
+      case TableCopyMode::INSERT_TO_NEW_TABLE:
         write_type = "insert";
         create_table = "true";
         break;
-      case TableCopyMode::UPSERT_TO_EXIST_TABLE:
+      case TableCopyMode::UPSERT_TO_EXISTING_TABLE:
         write_type = "upsert";
         create_table = "false";
         // Create the dst table and write some data to it.
         ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
         ww.set_num_write_threads(1);
-        ww.set_write_batch_size(1);
+        ww.Setup();
+        ww.Start();
+        ASSERT_EVENTUALLY([&]() {
+          ASSERT_GE(ww.rows_inserted(), 100);
+        });
+        ww.StopAndJoin();
+        break;
+      case TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE:
+        write_type = "upsert_ignore";
+        create_table = "false";
+        // Create the dst table and write some data to it.
+        ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+        ww.set_num_write_threads(1);
         ww.Setup();
         ww.Start();
         ASSERT_EVENTUALLY([&]() {
@@ -628,7 +655,7 @@ class ToolTest : public KuduTest {
     }
 
     // Check schema equals when destination table is created automatically.
-    if (args.mode == TableCopyMode::INSERT_TO_NOT_EXIST_TABLE ||
+    if (args.mode == TableCopyMode::INSERT_TO_NEW_TABLE ||
         args.mode == TableCopyMode::COPY_SCHEMA_ONLY) {
       vector<string> src_schema;
       NO_FATALS(RunActionStdoutLines(
@@ -698,20 +725,26 @@ class ToolTest : public KuduTest {
       ASSERT_GE(dst_lines.size(), 1);
       ASSERT_STR_CONTAINS(*dst_lines.rbegin(), "Total count 0 ");
     } else {
-      // Rows scanned from source table can be found in destination table.
+      // Rows scanned from the source table can be found in the destination table.
       set<string> sorted_dst_lines(dst_lines.begin(), dst_lines.end());
-      for (auto src_line = src_lines.begin(); src_line != src_lines.end();) {
-        if (src_line->find("key") != string::npos) {
-          ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line));
-          sorted_dst_lines.erase(*src_line);
+      for (auto src_line_it = src_lines.begin(); src_line_it != src_lines.end();) {
+        if (src_line_it->find("key") != string::npos) {
+          if (args.mode != TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE) {
+            ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line_it)) << *src_line_it;
+          }
+          sorted_dst_lines.erase(*src_line_it);
         }
-        src_line = src_lines.erase(src_line);
+        src_line_it = src_lines.erase(src_line_it);
       }
 
-      // Under all modes except UPSERT_TO_EXIST_TABLE, destination table is empty before
-      // copying, that means destination table should have no more rows than source table
-      // after copying.
-      if (args.mode != TableCopyMode::UPSERT_TO_EXIST_TABLE) {
+      // Under all modes except for UPSERT_TO_EXISTING_TABLE,
+      // INSERT_IGNORE_TO_EXISTING_TABLE, UPSERT_IGNORE_TO_EXISTING_TABLE,
+      // the destination table is empty before copying. That means the
+      // destination table should not have more rows than the source table
+      // after copying is complete.
+      if (args.mode != TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE &&
+          args.mode != TableCopyMode::UPSERT_TO_EXISTING_TABLE &&
+          args.mode != TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE) {
         for (const auto& dst_line : sorted_dst_lines) {
           ASSERT_STR_NOT_CONTAINS(dst_line, "key");
         }
@@ -813,7 +846,9 @@ INSTANTIATE_TEST_SUITE_P(ToolTestKerberosParameterized, ToolTestKerberosParamete
 enum RunCopyTableCheckArgsType {
   kTestCopyTableDstTableExist,
   kTestCopyTableDstTableNotExist,
+  kTestCopyTableInsertIgnore,
   kTestCopyTableUpsert,
+  kTestCopyTableUpsertIgnore,
   kTestCopyTableSchemaOnly,
   kTestCopyTableComplexSchema,
   kTestCopyUnpartitionedTable,
@@ -879,16 +914,22 @@ class ToolTestCopyTableParameterized :
                                    1,
                                    total_rows_,
                                    kSimpleSchemaColumns,
-                                   TableCopyMode::INSERT_TO_EXIST_TABLE,
+                                   TableCopyMode::INSERT_TO_EXISTING_TABLE,
                                    -1 };
     switch (test_case_) {
       case kTestCopyTableDstTableExist:
         return { args };
       case kTestCopyTableDstTableNotExist:
-        args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+        args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
+        return { args };
+      case kTestCopyTableInsertIgnore:
+        args.mode = TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE;
         return { args };
       case kTestCopyTableUpsert:
-        args.mode = TableCopyMode::UPSERT_TO_EXIST_TABLE;
+        args.mode = TableCopyMode::UPSERT_TO_EXISTING_TABLE;
+        return { args };
+      case kTestCopyTableUpsertIgnore:
+        args.mode = TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE;
         return { args };
       case kTestCopyTableSchemaOnly: {
         args.mode = TableCopyMode::COPY_SCHEMA_ONLY;
@@ -911,7 +952,7 @@ class ToolTestCopyTableParameterized :
       }
       case kTestCopyTableComplexSchema: {
         args.columns = kComplexSchemaColumns;
-        args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+        args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
         vector<RunCopyTableCheckArgs> multi_args;
         {
           auto args_temp = args;
@@ -951,7 +992,7 @@ class ToolTestCopyTableParameterized :
         return multi_args;
       }
       case kTestCopyUnpartitionedTable: {
-        args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+        args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
         vector<RunCopyTableCheckArgs> multi_args;
         {
           auto args_temp = args;
@@ -1176,7 +1217,9 @@ INSTANTIATE_TEST_SUITE_P(CopyTableParameterized,
                          ToolTestCopyTableParameterized,
                          ::testing::Values(kTestCopyTableDstTableExist,
                                            kTestCopyTableDstTableNotExist,
+                                           kTestCopyTableInsertIgnore,
                                            kTestCopyTableUpsert,
+                                           kTestCopyTableUpsertIgnore,
                                            kTestCopyTableSchemaOnly,
                                            kTestCopyTableComplexSchema,
                                            kTestCopyUnpartitionedTable,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index c7ea9744c..8178c7379 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -129,9 +129,12 @@ DEFINE_bool(report_scanner_stats, false,
 DEFINE_bool(show_values, false,
             "Whether to show values of scanned rows.");
 DEFINE_string(write_type, "insert",
-              "How data should be copied to the destination table. Valid values are 'insert', "
-              "'upsert' or the empty string. If the empty string, data will not be copied "
-              "(useful when --create_table=true).");
+              "Write operation type to use when populating the destination "
+              "table with the rows from the source table. Choose from "
+              "'insert', 'insert_ignore', 'upsert', 'upsert_ignore', or an "
+              "empty string. Empty string means the data isn't going to be "
+              "copied, which is useful with --create_table=true when just "
+              "creating the destination table without copying the data.");
 DEFINE_string(replica_selection, "CLOSEST",
               "Replica selection for scan operations. Acceptable values are: "
               "CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
@@ -165,9 +168,20 @@ bool IsFlagValueAcceptable(const char* flag_name,
   return false;
 }
 
+constexpr const char* const kWriteTypeInsert = "insert";
+constexpr const char* const kWriteTypeInsertIgnore = "insert_ignore";
+constexpr const char* const kWriteTypeUpsert = "upsert";
+constexpr const char* const kWriteTypeUpsertIgnore = "upsert_ignore";
+
 bool ValidateWriteType(const char* flag_name,
                        const string& flag_value) {
-  static const vector<string> kWriteTypes = { "insert", "upsert", "" };
+  static const vector<string> kWriteTypes = {
+    "",
+    kWriteTypeInsert,
+    kWriteTypeInsertIgnore,
+    kWriteTypeUpsert,
+    kWriteTypeUpsertIgnore,
+  };
   return IsFlagValueAcceptable(flag_name, flag_value, kWriteTypes);
 }
 
@@ -535,7 +549,7 @@ Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>& src_table
   return Status::OK();
 }
 
-void CheckPendingErrors(const client::sp::shared_ptr<KuduSession>& session) {
+void CheckPendingErrors(KuduSession* session) {
   vector<KuduError*> errors;
   ElementDeleter d(&errors);
   session->GetPendingErrors(&errors, nullptr);
@@ -623,9 +637,24 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread
   } while (0)
 
   DCHECK(thread_status);
+  KuduWriteOperation::Type op_type;
+  const auto& op_type_str = FLAGS_write_type;
+  if (op_type_str == kWriteTypeInsert) {
+    op_type = KuduWriteOperation::INSERT;
+  } else if (op_type_str == kWriteTypeInsertIgnore) {
+    op_type = KuduWriteOperation::INSERT_IGNORE;
+  } else if (op_type_str == kWriteTypeUpsert) {
+    op_type = KuduWriteOperation::UPSERT;
+  } else if (op_type_str == kWriteTypeUpsertIgnore) {
+    op_type = KuduWriteOperation::UPSERT_IGNORE;
+  } else {
+    *thread_status = Status::InvalidArgument(Substitute(
+        "invalid write operation type: $0", op_type_str));
+    return;
+  }
+
   client::sp::shared_ptr<KuduTable> dst_table;
   TASK_RET_NOT_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table));
-  const auto& dst_table_schema = dst_table->schema();
 
   // One session per thread.
   client::sp::shared_ptr<KuduSession> session((*dst_client_)->NewSession());
@@ -633,13 +662,22 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread
   TASK_RET_NOT_OK(session->SetErrorBufferSpace(1024 * 1024));
   session->SetTimeoutMillis(FLAGS_timeout_ms);
 
-  *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
+  // The callback's lambda of ScanData() keeps references to the session and
+  // the destination table objects, making sure they are alive when the callback
+  // is invoked.
+  *thread_status = ScanData(tokens, [table = std::move(dst_table),
+                                     session = std::move(session),
+                                     op_type] (const KuduScanBatch& batch) {
+    auto* s_ptr = session.get();
+    auto* t_ptr = table.get();
     for (const auto& row : batch) {
-      RETURN_NOT_OK(AddRow(dst_table, dst_table_schema, row, session));
+      RETURN_NOT_OK(AddRow(s_ptr, t_ptr, row, op_type));
     }
-    // Flush here to make sure all write operations have been sent.
-    auto s = session->Flush();
-    CheckPendingErrors(session);
+    // Flush the session to make sure all write operations have been sent
+    // to the server. If any error happens, CheckPendingErrors() will report
+    // on them.
+    auto s = s_ptr->Flush();
+    CheckPendingErrors(s_ptr);
     return s;
   });
 
@@ -791,23 +829,34 @@ Status TableScanner::StartCopy() {
   return StartWork(WorkType::kCopy);
 }
 
-Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
-                            const KuduSchema& table_schema,
+Status TableScanner::AddRow(KuduSession* session,
+                            KuduTable* table,
                             const KuduScanBatch::RowPtr& src_row,
-                            const client::sp::shared_ptr<KuduSession>& session) {
+                            KuduWriteOperation::Type write_op_type) {
   unique_ptr<KuduWriteOperation> write_op;
-  if (FLAGS_write_type == "insert") {
-    write_op.reset(table->NewInsert());
-  } else if (FLAGS_write_type == "upsert") {
-    write_op.reset(table->NewUpsert());
-  } else {
-    LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
+  switch (write_op_type) {
+    case KuduWriteOperation::INSERT:
+      write_op.reset(table->NewInsert());
+      break;
+    case KuduWriteOperation::INSERT_IGNORE:
+      write_op.reset(table->NewInsertIgnore());
+      break;
+    case KuduWriteOperation::UPSERT:
+      write_op.reset(table->NewUpsert());
+      break;
+    case KuduWriteOperation::UPSERT_IGNORE:
+      write_op.reset(table->NewUpsertIgnore());
+      break;
+    default:
+      return Status::InvalidArgument(
+          Substitute("unexpected op type: $0", write_op_type));
+      break;  // unreachable
   }
 
-  KuduPartialRow* dst_row = write_op->mutable_row();
-  size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
-  memcpy(dst_row->row_data_, src_row.row_data_, row_size);
-  BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(), true);
+  auto* dst_row = write_op->mutable_row();
+  memcpy(dst_row->row_data_, src_row.row_data_,
+         ContiguousRowHelper::row_size(*src_row.schema_));
+  BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), true);
 
   return session->Apply(write_op.release());
 }
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index b8608c64b..13dd39d39 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -21,24 +21,20 @@
 #include <cstdint>
 #include <functional>
 #include <iosfwd>
-#include <optional>
 #include <memory>
+#include <optional>
 #include <string>
 #include <vector>
 
 #include "kudu/client/client.h"
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/write_op.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 #include "kudu/util/threadpool.h"
 
 namespace kudu {
-
-namespace client {
-class KuduSchema;
-} // namespace client
-
 namespace tools {
 
 // This class is not thread-safe.
@@ -78,11 +74,10 @@ class TableScanner {
     kCopy
   };
 
-  static Status AddRow(
-      const client::sp::shared_ptr<client::KuduTable>& table,
-      const client::KuduSchema& table_schema,
-      const client::KuduScanBatch::RowPtr& src_row,
-      const client::sp::shared_ptr<client::KuduSession>& session);
+  static Status AddRow(client::KuduSession* session,
+                       client::KuduTable* table,
+                       const client::KuduScanBatch::RowPtr& src_row,
+                       client::KuduWriteOperation::Type write_op_type);
 
   // Convert replica selection from string into the KuduClient::ReplicaSelection
   // enumerator.