You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/02/16 21:28:00 UTC
[kudu] 01/02: [tool] fix a command bug, cmd: kudu wal dump ...
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 9f164b3c542a2e44c963c43488b4fc5b0d0f0f65
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Thu Jan 20 23:04:19 2022 +0800
[tool] fix a command bug, cmd: kudu wal dump ...
kudu wal dump command will interrupt after reading alter schema entry.
Change-Id: I27acc71597d038cafbbe687117bddb1ce16576c0
Reviewed-on: http://gerrit.cloudera.org:8080/18169
Tested-by: Kudu Jenkins
Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/common/wire_protocol-test-util.h | 41 +++++-
src/kudu/tools/kudu-tool-test.cc | 218 ++++++++++++++++++++++++++++++
src/kudu/tools/tool_action_common.cc | 14 +-
3 files changed, 269 insertions(+), 4 deletions(-)
diff --git a/src/kudu/common/wire_protocol-test-util.h b/src/kudu/common/wire_protocol-test-util.h
index a5a0412..2fcf03b 100644
--- a/src/kudu/common/wire_protocol-test-util.h
+++ b/src/kudu/common/wire_protocol-test-util.h
@@ -20,6 +20,7 @@
#include "kudu/common/wire_protocol.h"
+#include <map>
#include <string>
#include "kudu/common/partial_row.h"
@@ -35,6 +36,13 @@ inline Schema GetSimpleTestSchema() {
1);
}
+inline void RowAppendColumn(KuduPartialRow* row,
+ const std::map<std::string, std::string>& columns) {
+ for (const auto& column : columns) {
+ CHECK_OK(row->SetStringCopy(column.first.c_str(), column.second.c_str()));
+ }
+}
+
inline void AddTestRowWithNullableStringToPB(RowOperationsPB::Type op_type,
const Schema& schema,
int32_t key,
@@ -46,12 +54,32 @@ inline void AddTestRowWithNullableStringToPB(RowOperationsPB::Type op_type,
CHECK_OK(row.SetInt32("key", key));
CHECK_OK(row.SetInt32("int_val", int_val));
if (string_val) {
- CHECK_OK(row.SetStringCopy("string_val", string_val));
+ RowAppendColumn(&row, {{"string_val", std::string(string_val)}});
}
RowOperationsPBEncoder enc(ops);
enc.Add(op_type, row);
}
+inline void AddTestRowWithNullableColumnsStringToPB(
+ RowOperationsPB::Type op_type, const Schema& schema,
+ int32_t key, int32_t int_val, const char* string_val,
+ const std::map<std::string, std::string>& columns,
+ RowOperationsPB* ops) {
+ DCHECK(schema.initialized());
+ KuduPartialRow row(&schema);
+ CHECK_OK(row.SetInt32("key", key));
+ CHECK_OK(row.SetInt32("int_val", int_val));
+ if (string_val) {
+ RowAppendColumn(&row, {{"string_val", std::string(string_val)}});
+ }
+ if (!columns.empty()) {
+ RowAppendColumn(&row, columns);
+ }
+ RowOperationsPBEncoder enc(ops);
+ enc.Add(op_type, row);
+}
+
+
inline void AddTestRowToPB(RowOperationsPB::Type op_type,
const Schema& schema,
int32_t key,
@@ -61,6 +89,17 @@ inline void AddTestRowToPB(RowOperationsPB::Type op_type,
AddTestRowWithNullableStringToPB(op_type, schema, key, int_val, string_val.c_str(), ops);
}
+inline void AddTestRowToPBAppendColumns(RowOperationsPB::Type op_type,
+ const Schema& schema, int32_t key,
+ int32_t int_val,
+ const std::string& string_val,
+ const std::map<std::string, std::string>& columns,
+ RowOperationsPB* ops) {
+ AddTestRowWithNullableColumnsStringToPB(op_type, schema,
+ key, int_val, string_val.c_str(),
+ columns, ops);
+}
+
inline void AddTestKeyToPB(RowOperationsPB::Type op_type,
const Schema& schema,
int32_t key,
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 72898c2..2383b2f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2128,6 +2128,224 @@ TEST_F(ToolTest, TestWalDump) {
}
}
+TEST_F(ToolTest, TestWalDumpWithAlterSchema) {
+ const string kTestDir = GetTestPath("test");
+ const string kTestTablet = "ffffffffffffffffffffffffffffffff";
+ Schema schema(GetSimpleTestSchema());
+ Schema schema_with_ids(SchemaBuilder(schema).Build());
+
+ FsManager fs(env_, FsManagerOpts(kTestDir));
+ ASSERT_OK(fs.CreateInitialFileSystemLayout());
+ ASSERT_OK(fs.Open());
+
+ const std::string kFirstMessage("this is a test insert");
+ const std::string kAddColumnName1("addcolumn1_addcolumn1");
+ const std::string kAddColumnName2("addcolumn2_addcolumn2");
+ const std::string kAddColumnName1Message("insert a record, after alter schema");
+ const std::string kAddColumnName2Message("upsert a record, after alter schema");
+ {
+ scoped_refptr<Log> log;
+ ASSERT_OK(Log::Open(LogOptions(),
+ &fs,
+ /*file_cache*/nullptr,
+ kTestTablet,
+ schema_with_ids,
+ 0, // schema_version
+ /*metric_entity*/nullptr,
+ &log));
+
+ std::vector<ReplicateRefPtr> replicates;
+ {
+ OpId opid = consensus::MakeOpId(1, 1);
+ ReplicateRefPtr replicate =
+ consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+ replicate->get()->set_op_type(consensus::WRITE_OP);
+ replicate->get()->mutable_id()->CopyFrom(opid);
+ replicate->get()->set_timestamp(1);
+ WriteRequestPB* write = replicate->get()->mutable_write_request();
+ ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema,
+ opid.index(), 0, kFirstMessage,
+ write->mutable_row_operations());
+ write->set_tablet_id(kTestTablet);
+ replicates.emplace_back(replicate);
+ }
+ {
+ OpId opid = consensus::MakeOpId(1, 2);
+ ReplicateRefPtr replicate =
+ consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+ replicate->get()->set_op_type(consensus::ALTER_SCHEMA_OP);
+ replicate->get()->mutable_id()->CopyFrom(opid);
+ replicate->get()->set_timestamp(2);
+ tserver::AlterSchemaRequestPB* alter_schema =
+ replicate->get()->mutable_alter_schema_request();
+ SchemaBuilder schema_builder = SchemaBuilder(schema);
+ ASSERT_OK(schema_builder.AddColumn(kAddColumnName1, STRING, true, nullptr, nullptr));
+ ASSERT_OK(schema_builder.AddColumn(kAddColumnName2, STRING, true, nullptr, nullptr));
+ schema = schema_builder.BuildWithoutIds();
+ schema_with_ids = SchemaBuilder(schema).Build();
+ ASSERT_OK(SchemaToPB(schema_with_ids, alter_schema->mutable_schema()));
+ alter_schema->set_tablet_id(kTestTablet);
+ alter_schema->set_schema_version(1);
+ replicates.emplace_back(replicate);
+ }
+ {
+ OpId opid = consensus::MakeOpId(1, 3);
+ ReplicateRefPtr replicate =
+ consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+ replicate->get()->set_op_type(consensus::WRITE_OP);
+ replicate->get()->mutable_id()->CopyFrom(opid);
+ replicate->get()->set_timestamp(3);
+ WriteRequestPB* write = replicate->get()->mutable_write_request();
+ ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+ AddTestRowToPBAppendColumns(RowOperationsPB::INSERT, schema,
+ opid.index(), 2, kFirstMessage,
+ {{kAddColumnName1, kAddColumnName1Message}},
+ write->mutable_row_operations());
+ write->set_tablet_id(kTestTablet);
+ replicates.emplace_back(replicate);
+ }
+ {
+ OpId opid = consensus::MakeOpId(1, 4);
+ ReplicateRefPtr replicate =
+ consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+ replicate->get()->set_op_type(consensus::WRITE_OP);
+ replicate->get()->mutable_id()->CopyFrom(opid);
+ replicate->get()->set_timestamp(4);
+ WriteRequestPB* write = replicate->get()->mutable_write_request();
+ ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+ AddTestRowToPBAppendColumns(RowOperationsPB::UPSERT, schema,
+ 1, 1111, kFirstMessage,
+ {
+ {kAddColumnName1, kAddColumnName1Message},
+ {kAddColumnName2, kAddColumnName2Message},
+ },
+ write->mutable_row_operations());
+ write->set_tablet_id(kTestTablet);
+ replicates.emplace_back(replicate);
+ }
+
+ Synchronizer s;
+ ASSERT_OK(log->AsyncAppendReplicates(replicates, s.AsStatusCallback()));
+ ASSERT_OK(s.Wait());
+ }
+
+ string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1);
+ string stdout;
+ for (const auto& args : { Substitute("wal dump $0", wal_path),
+ Substitute("local_replica dump wals --fs_wal_dir=$0 $1",
+ kTestDir, kTestTablet)
+ }) {
+ SCOPED_TRACE(args);
+ for (const auto& print_entries : { "true", "1", "yes", "decoded" }) {
+ SCOPED_TRACE(print_entries);
+ NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1",
+ args, print_entries), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_MATCHES(stdout, "1\\.2@2");
+ ASSERT_STR_MATCHES(stdout, "1\\.3@3");
+ ASSERT_STR_MATCHES(stdout, "1\\.4@4");
+ ASSERT_STR_MATCHES(stdout, kFirstMessage);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName1);
+ ASSERT_STR_MATCHES(stdout, "ALTER_SCHEMA_OP");
+ ASSERT_STR_MATCHES(stdout, kAddColumnName1Message);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName2Message);
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ for (const auto& print_entries : { "false", "0", "no" }) {
+ SCOPED_TRACE(print_entries);
+ NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1",
+ args, print_entries), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4");
+ ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1);
+ ASSERT_STR_NOT_MATCHES(stdout, "ALTER_SCHEMA_OP");
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message);
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb",
+ args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4");
+ ASSERT_STR_MATCHES(stdout, kFirstMessage);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName1);
+ ASSERT_STR_MATCHES(stdout, "ALTER_SCHEMA_OP");
+ ASSERT_STR_MATCHES(stdout, kAddColumnName1Message);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName2Message);
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute(
+ "$0 --print_entries=pb --truncate_data=1", args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4");
+ ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message);
+ ASSERT_STR_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute(
+ "$0 --print_entries=id", args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_MATCHES(stdout, "1\\.2@2");
+ ASSERT_STR_MATCHES(stdout, "1\\.3@3");
+ ASSERT_STR_MATCHES(stdout, "1\\.4@4");
+ ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message);
+ ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message);
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute(
+ "$0 --print_meta=false", args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_NOT_MATCHES(stdout, "Header:");
+ ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_MATCHES(stdout, "1\\.2@2");
+ ASSERT_STR_MATCHES(stdout, "1\\.3@3");
+ ASSERT_STR_MATCHES(stdout, "1\\.4@4");
+ ASSERT_STR_MATCHES(stdout, kFirstMessage);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName1);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName1Message);
+ ASSERT_STR_MATCHES(stdout, kAddColumnName2Message);
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_NOT_MATCHES(stdout, "Footer:");
+ }
+ }
+}
+
TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
constexpr const char* const kTestTableId = "test-table";
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index a89e0b4..b615b71 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -74,6 +74,7 @@
#include "kudu/tools/tool.pb.h" // IWYU pragma: keep
#include "kudu/tools/tool_action.h"
#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h" // IWYU pragma: keep
#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
#include "kudu/util/async_util.h"
@@ -345,7 +346,7 @@ Status PrintDecodedWriteRequestPB(const string& indent,
return Status::OK();
}
-Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
+Status PrintDecoded(const LogEntryPB& entry, Schema* tablet_schema) {
PrintIdOnly(entry);
const string indent = "\t";
@@ -356,9 +357,16 @@ Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
if (replicate.op_type() == consensus::WRITE_OP) {
RETURN_NOT_OK(PrintDecodedWriteRequestPB(
indent,
- tablet_schema,
+ *tablet_schema,
replicate.write_request(),
replicate.has_request_id() ? &replicate.request_id() : nullptr));
+ } else if (replicate.op_type() == consensus::ALTER_SCHEMA_OP) {
+ if (!replicate.has_alter_schema_request()) {
+ LOG(ERROR) << "read an ALTER_SCHEMA_OP log entry, but has no alter_schema_request";
+ return Status::RuntimeError("ALTER_SCHEMA_OP log entry has no alter_schema_request");
+ }
+ RETURN_NOT_OK(SchemaFromPB(replicate.alter_schema_request().schema(), tablet_schema));
+ cout << indent << SecureShortDebugString(replicate) << endl;
} else {
cout << indent << SecureShortDebugString(replicate) << endl;
}
@@ -555,7 +563,7 @@ Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
cout << "Entry:\n" << SecureDebugString(*entry);
} else if (print_type == PRINT_DECODED) {
- RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema));
+ RETURN_NOT_OK(PrintDecoded(*entry, &tablet_schema));
} else if (print_type == PRINT_ID) {
PrintIdOnly(*entry);
}