You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/02/16 21:28:00 UTC

[kudu] 01/02: [tool] fix a command bug, cmd: kudu wal dump ...

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9f164b3c542a2e44c963c43488b4fc5b0d0f0f65
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Thu Jan 20 23:04:19 2022 +0800

    [tool] fix a command bug, cmd: kudu wal dump ...
    
    kudu wal dump command will interrupt after reading alter schema entry.
    
    Change-Id: I27acc71597d038cafbbe687117bddb1ce16576c0
    Reviewed-on: http://gerrit.cloudera.org:8080/18169
    Tested-by: Kudu Jenkins
    Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/common/wire_protocol-test-util.h |  41 +++++-
 src/kudu/tools/kudu-tool-test.cc          | 218 ++++++++++++++++++++++++++++++
 src/kudu/tools/tool_action_common.cc      |  14 +-
 3 files changed, 269 insertions(+), 4 deletions(-)

diff --git a/src/kudu/common/wire_protocol-test-util.h b/src/kudu/common/wire_protocol-test-util.h
index a5a0412..2fcf03b 100644
--- a/src/kudu/common/wire_protocol-test-util.h
+++ b/src/kudu/common/wire_protocol-test-util.h
@@ -20,6 +20,7 @@
 
 #include "kudu/common/wire_protocol.h"
 
+#include <map>
 #include <string>
 
 #include "kudu/common/partial_row.h"
@@ -35,6 +36,13 @@ inline Schema GetSimpleTestSchema() {
                 1);
 }
 
+inline void RowAppendColumn(KuduPartialRow* row,
+                            const std::map<std::string, std::string>& columns) {
+  for (const auto& column : columns) {
+    CHECK_OK(row->SetStringCopy(column.first.c_str(), column.second.c_str()));
+  }
+}
+
 inline void AddTestRowWithNullableStringToPB(RowOperationsPB::Type op_type,
                                              const Schema& schema,
                                              int32_t key,
@@ -46,12 +54,32 @@ inline void AddTestRowWithNullableStringToPB(RowOperationsPB::Type op_type,
   CHECK_OK(row.SetInt32("key", key));
   CHECK_OK(row.SetInt32("int_val", int_val));
   if (string_val) {
-    CHECK_OK(row.SetStringCopy("string_val", string_val));
+    RowAppendColumn(&row, {{"string_val", std::string(string_val)}});
   }
   RowOperationsPBEncoder enc(ops);
   enc.Add(op_type, row);
 }
 
+inline void AddTestRowWithNullableColumnsStringToPB(
+    RowOperationsPB::Type op_type, const Schema& schema,
+    int32_t key, int32_t int_val, const char* string_val,
+    const std::map<std::string, std::string>& columns,
+    RowOperationsPB* ops) {
+  DCHECK(schema.initialized());
+  KuduPartialRow row(&schema);
+  CHECK_OK(row.SetInt32("key", key));
+  CHECK_OK(row.SetInt32("int_val", int_val));
+  if (string_val) {
+    RowAppendColumn(&row, {{"string_val", std::string(string_val)}});
+  }
+  if (!columns.empty()) {
+    RowAppendColumn(&row, columns);
+  }
+  RowOperationsPBEncoder enc(ops);
+  enc.Add(op_type, row);
+}
+
+
 inline void AddTestRowToPB(RowOperationsPB::Type op_type,
                            const Schema& schema,
                            int32_t key,
@@ -61,6 +89,17 @@ inline void AddTestRowToPB(RowOperationsPB::Type op_type,
   AddTestRowWithNullableStringToPB(op_type, schema, key, int_val, string_val.c_str(), ops);
 }
 
+inline void AddTestRowToPBAppendColumns(RowOperationsPB::Type op_type,
+                                        const Schema& schema, int32_t key,
+                                        int32_t int_val,
+                                        const std::string& string_val,
+                                        const std::map<std::string, std::string>& columns,
+                                        RowOperationsPB* ops) {
+  AddTestRowWithNullableColumnsStringToPB(op_type, schema,
+                                          key, int_val, string_val.c_str(),
+                                          columns, ops);
+}
+
 inline void AddTestKeyToPB(RowOperationsPB::Type op_type,
                     const Schema& schema,
                     int32_t key,
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 72898c2..2383b2f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2128,6 +2128,224 @@ TEST_F(ToolTest, TestWalDump) {
   }
 }
 
+TEST_F(ToolTest, TestWalDumpWithAlterSchema) {
+  const string kTestDir = GetTestPath("test");
+  const string kTestTablet = "ffffffffffffffffffffffffffffffff";
+  Schema schema(GetSimpleTestSchema());
+  Schema schema_with_ids(SchemaBuilder(schema).Build());
+
+  FsManager fs(env_, FsManagerOpts(kTestDir));
+  ASSERT_OK(fs.CreateInitialFileSystemLayout());
+  ASSERT_OK(fs.Open());
+
+  const std::string kFirstMessage("this is a test insert");
+  const std::string kAddColumnName1("addcolumn1_addcolumn1");
+  const std::string kAddColumnName2("addcolumn2_addcolumn2");
+  const std::string kAddColumnName1Message("insert a record, after alter schema");
+  const std::string kAddColumnName2Message("upsert a record, after alter schema");
+  {
+    scoped_refptr<Log> log;
+    ASSERT_OK(Log::Open(LogOptions(),
+                        &fs,
+                        /*file_cache*/nullptr,
+                        kTestTablet,
+                        schema_with_ids,
+                        0, // schema_version
+                        /*metric_entity*/nullptr,
+                        &log));
+
+    std::vector<ReplicateRefPtr> replicates;
+    {
+      OpId opid = consensus::MakeOpId(1, 1);
+      ReplicateRefPtr replicate =
+          consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+      replicate->get()->set_op_type(consensus::WRITE_OP);
+      replicate->get()->mutable_id()->CopyFrom(opid);
+      replicate->get()->set_timestamp(1);
+      WriteRequestPB* write = replicate->get()->mutable_write_request();
+      ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+      AddTestRowToPB(RowOperationsPB::INSERT, schema,
+                     opid.index(), 0, kFirstMessage,
+                     write->mutable_row_operations());
+      write->set_tablet_id(kTestTablet);
+      replicates.emplace_back(replicate);
+    }
+    {
+      OpId opid = consensus::MakeOpId(1, 2);
+      ReplicateRefPtr replicate =
+          consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+      replicate->get()->set_op_type(consensus::ALTER_SCHEMA_OP);
+      replicate->get()->mutable_id()->CopyFrom(opid);
+      replicate->get()->set_timestamp(2);
+      tserver::AlterSchemaRequestPB* alter_schema =
+          replicate->get()->mutable_alter_schema_request();
+      SchemaBuilder schema_builder = SchemaBuilder(schema);
+      ASSERT_OK(schema_builder.AddColumn(kAddColumnName1, STRING, true, nullptr, nullptr));
+      ASSERT_OK(schema_builder.AddColumn(kAddColumnName2, STRING, true, nullptr, nullptr));
+      schema = schema_builder.BuildWithoutIds();
+      schema_with_ids = SchemaBuilder(schema).Build();
+      ASSERT_OK(SchemaToPB(schema_with_ids, alter_schema->mutable_schema()));
+      alter_schema->set_tablet_id(kTestTablet);
+      alter_schema->set_schema_version(1);
+      replicates.emplace_back(replicate);
+    }
+    {
+      OpId opid = consensus::MakeOpId(1, 3);
+      ReplicateRefPtr replicate =
+          consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+      replicate->get()->set_op_type(consensus::WRITE_OP);
+      replicate->get()->mutable_id()->CopyFrom(opid);
+      replicate->get()->set_timestamp(3);
+      WriteRequestPB* write = replicate->get()->mutable_write_request();
+      ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+      AddTestRowToPBAppendColumns(RowOperationsPB::INSERT, schema,
+                                  opid.index(), 2, kFirstMessage,
+                                  {{kAddColumnName1, kAddColumnName1Message}},
+                                  write->mutable_row_operations());
+      write->set_tablet_id(kTestTablet);
+      replicates.emplace_back(replicate);
+    }
+    {
+      OpId opid = consensus::MakeOpId(1, 4);
+      ReplicateRefPtr replicate =
+          consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+      replicate->get()->set_op_type(consensus::WRITE_OP);
+      replicate->get()->mutable_id()->CopyFrom(opid);
+      replicate->get()->set_timestamp(4);
+      WriteRequestPB* write = replicate->get()->mutable_write_request();
+      ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+      AddTestRowToPBAppendColumns(RowOperationsPB::UPSERT, schema,
+                                  1, 1111, kFirstMessage,
+                                  {
+                                  {kAddColumnName1, kAddColumnName1Message},
+                                  {kAddColumnName2, kAddColumnName2Message},
+                                  },
+                                  write->mutable_row_operations());
+      write->set_tablet_id(kTestTablet);
+      replicates.emplace_back(replicate);
+    }
+
+    Synchronizer s;
+    ASSERT_OK(log->AsyncAppendReplicates(replicates, s.AsStatusCallback()));
+    ASSERT_OK(s.Wait());
+  }
+
+  string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1);
+  string stdout;
+  for (const auto& args : { Substitute("wal dump $0", wal_path),
+                            Substitute("local_replica dump wals --fs_wal_dir=$0 $1",
+                                       kTestDir, kTestTablet)
+                           }) {
+    SCOPED_TRACE(args);
+    for (const auto& print_entries : { "true", "1", "yes", "decoded" }) {
+      SCOPED_TRACE(print_entries);
+      NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1",
+                                                 args, print_entries), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_MATCHES(stdout, "1\\.2@2");
+      ASSERT_STR_MATCHES(stdout, "1\\.3@3");
+      ASSERT_STR_MATCHES(stdout, "1\\.4@4");
+      ASSERT_STR_MATCHES(stdout, kFirstMessage);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName1);
+      ASSERT_STR_MATCHES(stdout, "ALTER_SCHEMA_OP");
+      ASSERT_STR_MATCHES(stdout, kAddColumnName1Message);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName2Message);
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    for (const auto& print_entries : { "false", "0", "no" }) {
+      SCOPED_TRACE(print_entries);
+      NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=$1",
+                                                 args, print_entries), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4");
+      ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1);
+      ASSERT_STR_NOT_MATCHES(stdout, "ALTER_SCHEMA_OP");
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message);
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb",
+                                                 args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4");
+      ASSERT_STR_MATCHES(stdout, kFirstMessage);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName1);
+      ASSERT_STR_MATCHES(stdout, "ALTER_SCHEMA_OP");
+      ASSERT_STR_MATCHES(stdout, kAddColumnName1Message);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName2Message);
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute(
+          "$0 --print_entries=pb --truncate_data=1", args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.2@2");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.3@3");
+      ASSERT_STR_NOT_MATCHES(stdout, "1\\.4@4");
+      ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message);
+      ASSERT_STR_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute(
+          "$0 --print_entries=id", args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_MATCHES(stdout, "Header:");
+      ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_MATCHES(stdout, "1\\.2@2");
+      ASSERT_STR_MATCHES(stdout, "1\\.3@3");
+      ASSERT_STR_MATCHES(stdout, "1\\.4@4");
+      ASSERT_STR_NOT_MATCHES(stdout, kFirstMessage);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName1Message);
+      ASSERT_STR_NOT_MATCHES(stdout, kAddColumnName2Message);
+      ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_MATCHES(stdout, "Footer:");
+    }
+    {
+      NO_FATALS(RunActionStdoutString(Substitute(
+          "$0 --print_meta=false", args), &stdout));
+      SCOPED_TRACE(stdout);
+      ASSERT_STR_NOT_MATCHES(stdout, "Header:");
+      ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+      ASSERT_STR_MATCHES(stdout, "1\\.2@2");
+      ASSERT_STR_MATCHES(stdout, "1\\.3@3");
+      ASSERT_STR_MATCHES(stdout, "1\\.4@4");
+      ASSERT_STR_MATCHES(stdout, kFirstMessage);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName1);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName1Message);
+      ASSERT_STR_MATCHES(stdout, kAddColumnName2Message);
+      ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+      ASSERT_STR_NOT_MATCHES(stdout, "Footer:");
+    }
+  }
+}
+
 TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
   constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
   constexpr const char* const kTestTableId = "test-table";
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index a89e0b4..b615b71 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -74,6 +74,7 @@
 #include "kudu/tools/tool.pb.h" // IWYU pragma: keep
 #include "kudu/tools/tool_action.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"   // IWYU pragma: keep
 #include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
 #include "kudu/util/async_util.h"
@@ -345,7 +346,7 @@ Status PrintDecodedWriteRequestPB(const string& indent,
   return Status::OK();
 }
 
-Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
+Status PrintDecoded(const LogEntryPB& entry, Schema* tablet_schema) {
   PrintIdOnly(entry);
 
   const string indent = "\t";
@@ -356,9 +357,16 @@ Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
     if (replicate.op_type() == consensus::WRITE_OP) {
       RETURN_NOT_OK(PrintDecodedWriteRequestPB(
           indent,
-          tablet_schema,
+          *tablet_schema,
           replicate.write_request(),
           replicate.has_request_id() ? &replicate.request_id() : nullptr));
+    } else if (replicate.op_type() == consensus::ALTER_SCHEMA_OP) {
+      if (!replicate.has_alter_schema_request()) {
+        LOG(ERROR) << "read an ALTER_SCHEMA_OP log entry, but has no alter_schema_request";
+        return Status::RuntimeError("ALTER_SCHEMA_OP log entry has no alter_schema_request");
+      }
+      RETURN_NOT_OK(SchemaFromPB(replicate.alter_schema_request().schema(), tablet_schema));
+      cout << indent << SecureShortDebugString(replicate) << endl;
     } else {
       cout << indent << SecureShortDebugString(replicate) << endl;
     }
@@ -555,7 +563,7 @@ Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
 
         cout << "Entry:\n" << SecureDebugString(*entry);
       } else if (print_type == PRINT_DECODED) {
-        RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema));
+        RETURN_NOT_OK(PrintDecoded(*entry, &tablet_schema));
       } else if (print_type == PRINT_ID) {
         PrintIdOnly(*entry);
       }