You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/04/06 02:15:56 UTC
[kudu] branch branch-1.17.x updated: [tools] update 'kudu table copy' CLI tool
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.17.x by this push:
new 261ce7daa [tools] update 'kudu table copy' CLI tool
261ce7daa is described below
commit 261ce7daa83a716c2ad9b960b9c904f11bdb7a68
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Mar 6 19:09:34 2023 -0800
[tools] update 'kudu table copy' CLI tool
This patch adds 'insert_ignore' and 'upsert_ignore' write operations
for the 'kudu table copy' CLI tool. It also updates the implementation
of TableScanner::CopyTask() and TableScanner::AddRow() to perform less
string comparisons.
Change-Id: I8061f510710a30019db62627c91fb4caf4a13d27
Reviewed-on: http://gerrit.cloudera.org:8080/19595
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <la...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/19693
Reviewed-by: Alexey Serbin <al...@apache.org>
---
src/kudu/tools/kudu-tool-test.cc | 91 +++++++++++++++++++++++++++----------
src/kudu/tools/table_scanner.cc | 97 ++++++++++++++++++++++++++++++----------
src/kudu/tools/table_scanner.h | 17 +++----
3 files changed, 146 insertions(+), 59 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 7714dcad6..92719b6f1 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -514,10 +514,12 @@ class ToolTest : public KuduTest {
}
enum class TableCopyMode {
- INSERT_TO_EXIST_TABLE = 0,
- INSERT_TO_NOT_EXIST_TABLE = 1,
- UPSERT_TO_EXIST_TABLE = 2,
+ INSERT_TO_EXISTING_TABLE = 0,
+ INSERT_TO_NEW_TABLE = 1,
+ UPSERT_TO_EXISTING_TABLE = 2,
COPY_SCHEMA_ONLY = 3,
+ INSERT_IGNORE_TO_EXISTING_TABLE = 4,
+ UPSERT_IGNORE_TO_EXISTING_TABLE = 5,
};
struct RunCopyTableCheckArgs {
@@ -532,7 +534,7 @@ class ToolTest : public KuduTest {
};
void RunCopyTableCheck(const RunCopyTableCheckArgs& args) {
- const string kDstTableName = "kudu.table.copy.to";
+ static constexpr const char* const kDstTableName = "kudu.table.copy.to";
// Prepare command flags, create destination table and write some data if needed.
string write_type;
@@ -541,24 +543,49 @@ class ToolTest : public KuduTest {
ww.set_table_name(kDstTableName);
ww.set_num_replicas(1);
switch (args.mode) {
- case TableCopyMode::INSERT_TO_EXIST_TABLE:
+ case TableCopyMode::INSERT_TO_EXISTING_TABLE:
write_type = "insert";
create_table = "false";
// Create the dst table.
ww.set_num_write_threads(0);
ww.Setup();
break;
- case TableCopyMode::INSERT_TO_NOT_EXIST_TABLE:
+ case TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE:
+ write_type = "insert_ignore";
+ create_table = "false";
+ // Create the dst table and write some data to it.
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 100);
+ });
+ ww.StopAndJoin();
+ break;
+ case TableCopyMode::INSERT_TO_NEW_TABLE:
write_type = "insert";
create_table = "true";
break;
- case TableCopyMode::UPSERT_TO_EXIST_TABLE:
+ case TableCopyMode::UPSERT_TO_EXISTING_TABLE:
write_type = "upsert";
create_table = "false";
// Create the dst table and write some data to it.
ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
ww.set_num_write_threads(1);
- ww.set_write_batch_size(1);
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 100);
+ });
+ ww.StopAndJoin();
+ break;
+ case TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE:
+ write_type = "upsert_ignore";
+ create_table = "false";
+ // Create the dst table and write some data to it.
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
ww.Setup();
ww.Start();
ASSERT_EVENTUALLY([&]() {
@@ -628,7 +655,7 @@ class ToolTest : public KuduTest {
}
// Check schema equals when destination table is created automatically.
- if (args.mode == TableCopyMode::INSERT_TO_NOT_EXIST_TABLE ||
+ if (args.mode == TableCopyMode::INSERT_TO_NEW_TABLE ||
args.mode == TableCopyMode::COPY_SCHEMA_ONLY) {
vector<string> src_schema;
NO_FATALS(RunActionStdoutLines(
@@ -698,20 +725,26 @@ class ToolTest : public KuduTest {
ASSERT_GE(dst_lines.size(), 1);
ASSERT_STR_CONTAINS(*dst_lines.rbegin(), "Total count 0 ");
} else {
- // Rows scanned from source table can be found in destination table.
+ // Rows scanned from the source table can be found in the destination table.
set<string> sorted_dst_lines(dst_lines.begin(), dst_lines.end());
- for (auto src_line = src_lines.begin(); src_line != src_lines.end();) {
- if (src_line->find("key") != string::npos) {
- ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line));
- sorted_dst_lines.erase(*src_line);
+ for (auto src_line_it = src_lines.begin(); src_line_it != src_lines.end();) {
+ if (src_line_it->find("key") != string::npos) {
+ if (args.mode != TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE) {
+ ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line_it)) << *src_line_it;
+ }
+ sorted_dst_lines.erase(*src_line_it);
}
- src_line = src_lines.erase(src_line);
+ src_line_it = src_lines.erase(src_line_it);
}
- // Under all modes except UPSERT_TO_EXIST_TABLE, destination table is empty before
- // copying, that means destination table should have no more rows than source table
- // after copying.
- if (args.mode != TableCopyMode::UPSERT_TO_EXIST_TABLE) {
+ // Under all modes except for UPSERT_TO_EXISTING_TABLE,
+ // INSERT_IGNORE_TO_EXISTING_TABLE, UPSERT_IGNORE_TO_EXISTING_TABLE,
+ // the destination table is empty before copying. That means the
+ // destination table should not have more rows than the source table
+ // after copying is complete.
+ if (args.mode != TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE &&
+ args.mode != TableCopyMode::UPSERT_TO_EXISTING_TABLE &&
+ args.mode != TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE) {
for (const auto& dst_line : sorted_dst_lines) {
ASSERT_STR_NOT_CONTAINS(dst_line, "key");
}
@@ -813,7 +846,9 @@ INSTANTIATE_TEST_SUITE_P(ToolTestKerberosParameterized, ToolTestKerberosParamete
enum RunCopyTableCheckArgsType {
kTestCopyTableDstTableExist,
kTestCopyTableDstTableNotExist,
+ kTestCopyTableInsertIgnore,
kTestCopyTableUpsert,
+ kTestCopyTableUpsertIgnore,
kTestCopyTableSchemaOnly,
kTestCopyTableComplexSchema,
kTestCopyUnpartitionedTable,
@@ -879,16 +914,22 @@ class ToolTestCopyTableParameterized :
1,
total_rows_,
kSimpleSchemaColumns,
- TableCopyMode::INSERT_TO_EXIST_TABLE,
+ TableCopyMode::INSERT_TO_EXISTING_TABLE,
-1 };
switch (test_case_) {
case kTestCopyTableDstTableExist:
return { args };
case kTestCopyTableDstTableNotExist:
- args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+ args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
+ return { args };
+ case kTestCopyTableInsertIgnore:
+ args.mode = TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE;
return { args };
case kTestCopyTableUpsert:
- args.mode = TableCopyMode::UPSERT_TO_EXIST_TABLE;
+ args.mode = TableCopyMode::UPSERT_TO_EXISTING_TABLE;
+ return { args };
+ case kTestCopyTableUpsertIgnore:
+ args.mode = TableCopyMode::UPSERT_IGNORE_TO_EXISTING_TABLE;
return { args };
case kTestCopyTableSchemaOnly: {
args.mode = TableCopyMode::COPY_SCHEMA_ONLY;
@@ -911,7 +952,7 @@ class ToolTestCopyTableParameterized :
}
case kTestCopyTableComplexSchema: {
args.columns = kComplexSchemaColumns;
- args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+ args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
vector<RunCopyTableCheckArgs> multi_args;
{
auto args_temp = args;
@@ -951,7 +992,7 @@ class ToolTestCopyTableParameterized :
return multi_args;
}
case kTestCopyUnpartitionedTable: {
- args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+ args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
vector<RunCopyTableCheckArgs> multi_args;
{
auto args_temp = args;
@@ -1176,7 +1217,9 @@ INSTANTIATE_TEST_SUITE_P(CopyTableParameterized,
ToolTestCopyTableParameterized,
::testing::Values(kTestCopyTableDstTableExist,
kTestCopyTableDstTableNotExist,
+ kTestCopyTableInsertIgnore,
kTestCopyTableUpsert,
+ kTestCopyTableUpsertIgnore,
kTestCopyTableSchemaOnly,
kTestCopyTableComplexSchema,
kTestCopyUnpartitionedTable,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index c7ea9744c..8178c7379 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -129,9 +129,12 @@ DEFINE_bool(report_scanner_stats, false,
DEFINE_bool(show_values, false,
"Whether to show values of scanned rows.");
DEFINE_string(write_type, "insert",
- "How data should be copied to the destination table. Valid values are 'insert', "
- "'upsert' or the empty string. If the empty string, data will not be copied "
- "(useful when --create_table=true).");
+ "Write operation type to use when populating the destination "
+ "table with the rows from the source table. Choose from "
+ "'insert', 'insert_ignore', 'upsert', 'upsert_ignore', or an "
+ "empty string. Empty string means the data isn't going to be "
+ "copied, which is useful with --create_table=true when just "
+ "creating the destination table without copying the data.");
DEFINE_string(replica_selection, "CLOSEST",
"Replica selection for scan operations. Acceptable values are: "
"CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
@@ -165,9 +168,20 @@ bool IsFlagValueAcceptable(const char* flag_name,
return false;
}
+constexpr const char* const kWriteTypeInsert = "insert";
+constexpr const char* const kWriteTypeInsertIgnore = "insert_ignore";
+constexpr const char* const kWriteTypeUpsert = "upsert";
+constexpr const char* const kWriteTypeUpsertIgnore = "upsert_ignore";
+
bool ValidateWriteType(const char* flag_name,
const string& flag_value) {
- static const vector<string> kWriteTypes = { "insert", "upsert", "" };
+ static const vector<string> kWriteTypes = {
+ "",
+ kWriteTypeInsert,
+ kWriteTypeInsertIgnore,
+ kWriteTypeUpsert,
+ kWriteTypeUpsertIgnore,
+ };
return IsFlagValueAcceptable(flag_name, flag_value, kWriteTypes);
}
@@ -535,7 +549,7 @@ Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>& src_table
return Status::OK();
}
-void CheckPendingErrors(const client::sp::shared_ptr<KuduSession>& session) {
+void CheckPendingErrors(KuduSession* session) {
vector<KuduError*> errors;
ElementDeleter d(&errors);
session->GetPendingErrors(&errors, nullptr);
@@ -623,9 +637,24 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread
} while (0)
DCHECK(thread_status);
+ KuduWriteOperation::Type op_type;
+ const auto& op_type_str = FLAGS_write_type;
+ if (op_type_str == kWriteTypeInsert) {
+ op_type = KuduWriteOperation::INSERT;
+ } else if (op_type_str == kWriteTypeInsertIgnore) {
+ op_type = KuduWriteOperation::INSERT_IGNORE;
+ } else if (op_type_str == kWriteTypeUpsert) {
+ op_type = KuduWriteOperation::UPSERT;
+ } else if (op_type_str == kWriteTypeUpsertIgnore) {
+ op_type = KuduWriteOperation::UPSERT_IGNORE;
+ } else {
+ *thread_status = Status::InvalidArgument(Substitute(
+ "invalid write operation type: $0", op_type_str));
+ return;
+ }
+
client::sp::shared_ptr<KuduTable> dst_table;
TASK_RET_NOT_OK((*dst_client_)->OpenTable(*dst_table_name_, &dst_table));
- const auto& dst_table_schema = dst_table->schema();
// One session per thread.
client::sp::shared_ptr<KuduSession> session((*dst_client_)->NewSession());
@@ -633,13 +662,22 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens, Status* thread
TASK_RET_NOT_OK(session->SetErrorBufferSpace(1024 * 1024));
session->SetTimeoutMillis(FLAGS_timeout_ms);
- *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
+ // The callback's lambda of ScanData() keeps references to the session and
+ // the destination table objects, making sure they are alive when the callback
+ // is invoked.
+ *thread_status = ScanData(tokens, [table = std::move(dst_table),
+ session = std::move(session),
+ op_type] (const KuduScanBatch& batch) {
+ auto* s_ptr = session.get();
+ auto* t_ptr = table.get();
for (const auto& row : batch) {
- RETURN_NOT_OK(AddRow(dst_table, dst_table_schema, row, session));
+ RETURN_NOT_OK(AddRow(s_ptr, t_ptr, row, op_type));
}
- // Flush here to make sure all write operations have been sent.
- auto s = session->Flush();
- CheckPendingErrors(session);
+ // Flush the session to make sure all write operations have been sent
+ // to the server. If any error happens, CheckPendingErrors() will report
+ // on them.
+ auto s = s_ptr->Flush();
+ CheckPendingErrors(s_ptr);
return s;
});
@@ -791,23 +829,34 @@ Status TableScanner::StartCopy() {
return StartWork(WorkType::kCopy);
}
-Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
- const KuduSchema& table_schema,
+Status TableScanner::AddRow(KuduSession* session,
+ KuduTable* table,
const KuduScanBatch::RowPtr& src_row,
- const client::sp::shared_ptr<KuduSession>& session) {
+ KuduWriteOperation::Type write_op_type) {
unique_ptr<KuduWriteOperation> write_op;
- if (FLAGS_write_type == "insert") {
- write_op.reset(table->NewInsert());
- } else if (FLAGS_write_type == "upsert") {
- write_op.reset(table->NewUpsert());
- } else {
- LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
+ switch (write_op_type) {
+ case KuduWriteOperation::INSERT:
+ write_op.reset(table->NewInsert());
+ break;
+ case KuduWriteOperation::INSERT_IGNORE:
+ write_op.reset(table->NewInsertIgnore());
+ break;
+ case KuduWriteOperation::UPSERT:
+ write_op.reset(table->NewUpsert());
+ break;
+ case KuduWriteOperation::UPSERT_IGNORE:
+ write_op.reset(table->NewUpsertIgnore());
+ break;
+ default:
+ return Status::InvalidArgument(
+ Substitute("unexpected op type: $0", write_op_type));
+ break; // unreachable
}
- KuduPartialRow* dst_row = write_op->mutable_row();
- size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
- memcpy(dst_row->row_data_, src_row.row_data_, row_size);
- BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(), true);
+ auto* dst_row = write_op->mutable_row();
+ memcpy(dst_row->row_data_, src_row.row_data_,
+ ContiguousRowHelper::row_size(*src_row.schema_));
+ BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), true);
return session->Apply(write_op.release());
}
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index b8608c64b..13dd39d39 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -21,24 +21,20 @@
#include <cstdint>
#include <functional>
#include <iosfwd>
-#include <optional>
#include <memory>
+#include <optional>
#include <string>
#include <vector>
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/write_op.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"
namespace kudu {
-
-namespace client {
-class KuduSchema;
-} // namespace client
-
namespace tools {
// This class is not thread-safe.
@@ -78,11 +74,10 @@ class TableScanner {
kCopy
};
- static Status AddRow(
- const client::sp::shared_ptr<client::KuduTable>& table,
- const client::KuduSchema& table_schema,
- const client::KuduScanBatch::RowPtr& src_row,
- const client::sp::shared_ptr<client::KuduSession>& session);
+ static Status AddRow(client::KuduSession* session,
+ client::KuduTable* table,
+ const client::KuduScanBatch::RowPtr& src_row,
+ client::KuduWriteOperation::Type write_op_type);
// Convert replica selection from string into the KuduClient::ReplicaSelection
// enumerator.