You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by zh...@apache.org on 2022/09/16 08:07:29 UTC

[kudu] 02/02: [tablet] make compaction-test more robust

This is an automated email from the ASF dual-hosted git repository.

zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ff2bc835d3e77374b01235360c1ea3fb57233f4d
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Sep 12 14:40:12 2022 -0700

    [tablet] make compaction-test more robust
    
    I was investigating one compaction issue and tried to run benchmark-like
    scenarios from compaction-test, thinking it would help to reproduce the
    problem.  Funny enough, it turned out the test had its own issues and
    crashed with SIGSEGV (the essential details are below).
    
    This patch addresses those test-only issues by at least
      * handling triggered assertions properly
      * increasing kLargeRollThreshold to 8 GByte
    
    I also updated the schema of the test table to switch to INT64 for
    the 'val' column and also allow for more rows to be inserted without
    duplication of the primary key.
    
    ------------------------------------------------------------------------
    
    $ KUDU_ALLOW_SLOW_TESTS=1 ./bin/compaction-test \
      --gtest_filter='TestCompaction.BenchmarkMerge*' \
      --merge_benchmark_num_rowsets=5 \
      --merge_benchmark_num_rows_per_rowset=100000000
    
    src/kudu/tablet/compaction-test.cc:395: Failure
    Expected equality of these values:
      1
      rowsets.size()
        Which is: 2
    ...
    I20220911 17:36:47.404556 56262 compaction-test.cc:506] Beginning compaction
    
    *** Aborted at 1662943010 (unix time) try "date -d @1662943010" if you are using GNU date ***
    PC: @                0x0 (unknown)
    *** SIGSEGV (@0x70) received by PID 56262 (TID 0x7f64cd6a3080) from PID 112; stack trace: ***
        @       0x9e1296 google::(anonymous namespace)::FailureSignalHandler()
        @       0x3ae0e0f710 (unknown)
        @       0xa88e70 kudu::tablet::CompactionInput::Create()
        @       0x9c46c2 kudu::tablet::TestCompaction::BuildCompactionInput()
        @       0x9cbac0 kudu::tablet::TestCompaction::DoBenchmark<>()
    ...
    
    Segmentation fault (core dumped)
    
    Change-Id: I664789e2178dfd8dc6b6f05f9064db1ac14d89e3
    Reviewed-on: http://gerrit.cloudera.org:8080/18981
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
    Reviewed-by: Mahesh Reddy <mr...@cloudera.com>
    Reviewed-by: Yifan Zhang <ch...@163.com>
---
 src/kudu/tablet/compaction-test.cc | 405 +++++++++++++++++++------------------
 1 file changed, 208 insertions(+), 197 deletions(-)

diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index e33f841f2..d00960f67 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -85,10 +85,10 @@ DEFINE_string(merge_benchmark_input_dir, "",
               "with id 00000 or 1111 and tablet id 'KuduCompactionBenchTablet', "
               "if this is specified. Otherwise, inputs will "
               "be generated as part of the test itself.");
-DEFINE_int32(merge_benchmark_num_rowsets, 3,
-             "Number of rowsets as input to the merge");
-DEFINE_int32(merge_benchmark_num_rows_per_rowset, 500000,
-             "Number of rowsets as input to the merge");
+DEFINE_uint32(merge_benchmark_num_rowsets, 3,
+              "Number of rowsets as input to the merge");
+DEFINE_uint32(merge_benchmark_num_rows_per_rowset, 500000,
+              "Number of rowsets as input to the merge");
 
 DECLARE_string(block_manager);
 
@@ -107,8 +107,8 @@ namespace tablet {
 
 class RowSetMetadata;
 
-constexpr const char* const kRowKeyFormat = "hello %08d";
-constexpr const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB
+constexpr const char* const kRowKeyFormat = "hello %010ld";
+constexpr const size_t kLargeRollThreshold = 8UL * 1024 * 1024 * 1024;  // 8GB
 constexpr const size_t kSmallRollThreshold = 1024; // 1KB
 
 class TestCompaction : public KuduRowSetTest {
@@ -125,7 +125,7 @@ class TestCompaction : public KuduRowSetTest {
   static Schema CreateSchema() {
     SchemaBuilder builder;
     CHECK_OK(builder.AddKeyColumn("key", STRING));
-    CHECK_OK(builder.AddColumn("val", INT32));
+    CHECK_OK(builder.AddColumn("val", INT64));
     CHECK_OK(builder.AddNullableColumn("nullable_val", INT32));
     return builder.BuildWithoutIds();
   }
@@ -133,7 +133,7 @@ class TestCompaction : public KuduRowSetTest {
   // Insert n_rows rows of data.
   // Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>)
   void InsertRows(MemRowSet* mrs, int n_rows, int delta) {
-    for (int32_t i = 0; i < n_rows; i++) {
+    for (int64_t i = 0; i < n_rows; i++) {
       InsertRow(mrs, i * 10 + delta, i);
     }
   }
@@ -141,7 +141,7 @@ class TestCompaction : public KuduRowSetTest {
   // Inserts a row.
   // The 'nullable_val' column is set to either NULL (when val is odd)
   // or 'val' (when val is even).
-  void InsertRow(MemRowSet* mrs, int row_key, int32_t val) {
+  void InsertRow(MemRowSet* mrs, int64_t row_key, int64_t val) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     InsertRowInOp(mrs, op, row_key, val);
@@ -149,7 +149,7 @@ class TestCompaction : public KuduRowSetTest {
   }
 
   void DeleteAndInsertRow(MemRowSet* mrs_to_delete, MemRowSet* mrs_to_insert,
-                          int row_key, int32_t val, bool also_update) {
+                          int64_t row_key, int64_t val, bool also_update) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     DeleteRowInOp(mrs_to_delete, op, row_key);
@@ -160,7 +160,7 @@ class TestCompaction : public KuduRowSetTest {
     op.FinishApplying();
   }
 
-  void InsertAndDeleteRow(MemRowSet* mrs, int row_key, int32_t val) {
+  void InsertAndDeleteRow(MemRowSet* mrs, int64_t row_key, int64_t val) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     InsertRowInOp(mrs, op, row_key, val);
@@ -168,13 +168,13 @@ class TestCompaction : public KuduRowSetTest {
     op.FinishApplying();
   }
 
-  void BuildRow(int row_key, int32_t val) {
+  void BuildRow(int64_t row_key, int64_t val) {
     row_builder_.Reset();
     snprintf(key_buf_, sizeof(key_buf_), kRowKeyFormat, row_key);
     row_builder_.AddString(Slice(key_buf_));
-    row_builder_.AddInt32(val);
+    row_builder_.AddInt64(val);
     if (val % 2 == 0) {
-      row_builder_.AddInt32(val);
+      row_builder_.AddInt32(static_cast<int32_t>(val));
     } else {
       row_builder_.AddNull();
     }
@@ -182,8 +182,8 @@ class TestCompaction : public KuduRowSetTest {
 
   void InsertRowInOp(MemRowSet* mrs,
                      const ScopedOp& op,
-                     int row_key,
-                     int32_t val) {
+                     int64_t row_key,
+                     int64_t val) {
     BuildRow(row_key, val);
     if (*row_builder_.schema() != mrs->schema()) {
       // The MemRowSet is not projecting the row, so must be done by the caller
@@ -205,14 +205,14 @@ class TestCompaction : public KuduRowSetTest {
   // If 'val' is even, 'nullable_val' is set to NULL. Otherwise, set to 'val'.
   // Note that this is the opposite of InsertRow() above, so that the updates
   // flop NULL to non-NULL and vice versa.
-  void UpdateRows(RowSet* rowset, int n_rows, int delta, int32_t new_val) {
-    for (uint32_t i = 0; i < n_rows; i++) {
+  void UpdateRows(RowSet* rowset, uint32_t n_rows, int delta, int64_t new_val) {
+    for (int64_t i = 0; i < n_rows; i++) {
       SCOPED_TRACE(i);
       UpdateRow(rowset, i * 10 + delta, new_val);
     }
   }
 
-  void UpdateRow(RowSet* rowset, int row_key, int32_t new_val) {
+  void UpdateRow(RowSet* rowset, int64_t row_key, int64_t new_val) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     UpdateRowInOp(rowset, op, row_key, new_val);
@@ -221,8 +221,8 @@ class TestCompaction : public KuduRowSetTest {
 
   void UpdateRowInOp(RowSet* rowset,
                      const ScopedOp& op,
-                     int row_key,
-                     int32_t new_val) {
+                     int64_t row_key,
+                     int64_t new_val) {
     ColumnId col_id = schema_.column_id(schema_.find_column("val"));
     ColumnId nullable_col_id = schema_.column_id(schema_.find_column("nullable_val"));
 
@@ -257,25 +257,25 @@ class TestCompaction : public KuduRowSetTest {
                                 &result));
   }
 
-  void DeleteRows(RowSet* rowset, int n_rows) {
+  void DeleteRows(RowSet* rowset, uint32_t n_rows) {
     DeleteRows(rowset, n_rows, 0);
   }
 
-  void DeleteRows(RowSet* rowset, int n_rows, int delta) {
-    for (uint32_t i = 0; i < n_rows; i++) {
+  void DeleteRows(RowSet* rowset, uint32_t n_rows, int delta) {
+    for (int64_t i = 0; i < n_rows; i++) {
       SCOPED_TRACE(i);
       DeleteRow(rowset, i * 10 + delta);
     }
   }
 
-  void DeleteRow(RowSet* rowset, int row_key) {
+  void DeleteRow(RowSet* rowset, int64_t row_key) {
     ScopedOp op(&mvcc_, clock_.Now());
     op.StartApplying();
     DeleteRowInOp(rowset, op, row_key);
     op.FinishApplying();
   }
 
-  void DeleteRowInOp(RowSet* rowset, const ScopedOp& op, int row_key) {
+  void DeleteRowInOp(RowSet* rowset, const ScopedOp& op, int64_t row_key) {
     char keybuf[256];
     faststring update_buf;
     snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, row_key);
@@ -308,34 +308,44 @@ class TestCompaction : public KuduRowSetTest {
   // Flush the given CompactionInput 'input' to disk with the given snapshot.
   // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
   // them to the vector.
-  void DoFlushAndReopen(
-      CompactionInput* input, const Schema& projection, const MvccSnapshot& snap,
-      int64_t roll_threshold, vector<shared_ptr<DiskRowSet>>* result_rowsets) {
+  Status DoFlushAndReopen(
+      CompactionInput* input,
+      const Schema& projection,
+      const MvccSnapshot& snap,
+      size_t roll_threshold,
+      vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     // Flush with a large roll threshold so we only write a single file.
     // This simplifies the test so we always need to reopen only a single rowset.
     RollingDiskRowSetWriter rsw(tablet()->metadata(), projection,
                                 Tablet::DefaultBloomSizing(),
                                 roll_threshold);
-    ASSERT_OK(rsw.Open());
-    ASSERT_OK(FlushCompactionInput(tablet()->metadata()->tablet_id(),
-                                   fs_manager()->block_manager()->error_manager(),
-                                   input, snap, HistoryGcOpts::Disabled(), &rsw));
-    ASSERT_OK(rsw.Finish());
+    RETURN_NOT_OK(rsw.Open());
+    RETURN_NOT_OK(FlushCompactionInput(
+        tablet()->metadata()->tablet_id(),
+        fs_manager()->block_manager()->error_manager(),
+        input,
+        snap,
+        HistoryGcOpts::Disabled(),
+        &rsw));
+    RETURN_NOT_OK(rsw.Finish());
 
     vector<shared_ptr<RowSetMetadata>> metas;
     rsw.GetWrittenRowSetMetadata(&metas);
     for (const auto& meta : metas) {
-      ASSERT_TRUE(meta->HasBloomDataBlockForTests());
+      if (!meta->HasBloomDataBlockForTests()) {
+        return Status::IllegalState("no bloom filter data blocks found");
+      }
     }
     if (result_rowsets) {
       // Re-open the outputs
       for (const auto& meta : metas) {
         shared_ptr<DiskRowSet> rs;
-        ASSERT_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
-                                   mem_trackers_, nullptr, &rs));
-        result_rowsets->push_back(rs);
+        RETURN_NOT_OK(DiskRowSet::Open(
+            meta, log_anchor_registry_.get(), mem_trackers_, nullptr, &rs));
+        result_rowsets->emplace_back(std::move(rs));
       }
     }
+    return Status::OK();
   }
 
   static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
@@ -346,7 +356,7 @@ class TestCompaction : public KuduRowSetTest {
     for (const auto& rs : rowsets) {
       unique_ptr<CompactionInput> input;
       RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, nullptr, &input));
-      merge_inputs.push_back(shared_ptr<CompactionInput>(input.release()));
+      merge_inputs.emplace_back(input.release());
     }
     out->reset(CompactionInput::Merge(merge_inputs, &projection));
     return Status::OK();
@@ -356,14 +366,14 @@ class TestCompaction : public KuduRowSetTest {
   // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
   // them to the vector.
   Status CompactAndReopen(const vector<shared_ptr<DiskRowSet>>& rowsets,
-                          const Schema& projection, int64_t roll_threshold,
+                          const Schema& projection,
+                          size_t roll_threshold,
                           vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot merge_snap(mvcc_);
     unique_ptr<CompactionInput> compact_input;
     RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, &compact_input));
-    DoFlushAndReopen(compact_input.get(), projection, merge_snap, roll_threshold,
-                     result_rowsets);
-    return Status::OK();
+    return DoFlushAndReopen(
+        compact_input.get(), projection, merge_snap, roll_threshold, result_rowsets);
   }
 
   // Same as above, but sets a high roll threshold so it only produces a single output.
@@ -379,22 +389,29 @@ class TestCompaction : public KuduRowSetTest {
   // Flush an MRS to disk.
   // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends
   // them to the vector.
-  void FlushMRSAndReopen(const MemRowSet& mrs, const Schema& projection,
-                         int64_t roll_threshold,
-                         vector<shared_ptr<DiskRowSet>>* result_rowsets) {
+  Status FlushMRSAndReopen(const MemRowSet& mrs,
+                           const Schema& projection,
+                           size_t roll_threshold,
+                           vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot snap(mvcc_);
     vector<shared_ptr<RowSetMetadata>> rowset_metas;
     unique_ptr<CompactionInput> input(CompactionInput::Create(mrs, &projection, snap));
-    DoFlushAndReopen(input.get(), projection, snap, roll_threshold, result_rowsets);
+    return DoFlushAndReopen(input.get(), projection, snap, roll_threshold, result_rowsets);
   }
 
   // Same as above, but sets a high roll threshold so it only produces a single output.
-  void FlushMRSAndReopenNoRoll(const MemRowSet& mrs, const Schema& projection,
-                            shared_ptr<DiskRowSet>* result_rs) {
+  Status FlushMRSAndReopenNoRoll(const MemRowSet& mrs,
+                                 const Schema& projection,
+                                 shared_ptr<DiskRowSet>* result_rs) {
     vector<shared_ptr<DiskRowSet>> rowsets;
-    FlushMRSAndReopen(mrs, projection, kLargeRollThreshold, &rowsets);
-    ASSERT_EQ(1, rowsets.size());
-    *result_rs = rowsets[0];
+    RETURN_NOT_OK(FlushMRSAndReopen(mrs, projection, kLargeRollThreshold, &rowsets));
+    if (rowsets.size() != 1) {
+      return Status::IllegalState(Substitute(
+          "got $0 disk rowsets vs 1 expected", rowsets.size()));
+
+    }
+    *result_rs = std::move(rowsets[0]);
+    return Status::OK();
   }
 
   // Create an invisible MRS -- one whose inserts and deletes were applied at
@@ -429,19 +446,18 @@ class TestCompaction : public KuduRowSetTest {
     for (const auto& schema : schemas) {
       // Create a memrowset with a bunch of rows and updates.
       shared_ptr<MemRowSet> mrs;
-      CHECK_OK(MemRowSet::Create(delta, schema, log_anchor_registry_.get(),
+      ASSERT_OK(MemRowSet::Create(delta, schema, log_anchor_registry_.get(),
                                  mem_trackers_.tablet_tracker, &mrs));
       InsertRows(mrs.get(), 1000, delta);
       UpdateRows(mrs.get(), 1000, delta, 1);
 
       // Flush it to disk and re-open it.
       shared_ptr<DiskRowSet> rs;
-      FlushMRSAndReopenNoRoll(*mrs, schema, &rs);
-      NO_FATALS();
-      rowsets.push_back(rs);
+      ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema, &rs));
+      rowsets.emplace_back(std::move(rs));
 
       // Perform some updates into DMS
-      UpdateRows(rs.get(), 1000, delta, 2);
+      UpdateRows(rowsets.back().get(), 1000, delta, 2);
       delta++;
     }
 
@@ -458,36 +474,38 @@ class TestCompaction : public KuduRowSetTest {
 
   template<bool OVERLAP_INPUTS>
   void DoBenchmark() {
-    vector<shared_ptr<DiskRowSet>> rowsets;
+    const uint32_t num_rowsets = FLAGS_merge_benchmark_num_rowsets;
+    const uint32_t num_rows_per_rowset = FLAGS_merge_benchmark_num_rows_per_rowset;
 
+    vector<shared_ptr<DiskRowSet>> rowsets;
     if (FLAGS_merge_benchmark_input_dir.empty()) {
+      rowsets.reserve(num_rowsets);
+
       // Create inputs.
-      for (int i = 0; i < FLAGS_merge_benchmark_num_rowsets; i++) {
+      for (uint32_t i = 0; i < num_rowsets; i++) {
         // Create a memrowset with a bunch of rows and updates.
         shared_ptr<MemRowSet> mrs;
-        CHECK_OK(MemRowSet::Create(i, schema_, log_anchor_registry_.get(),
-                                   mem_trackers_.tablet_tracker, &mrs));
+        ASSERT_OK(MemRowSet::Create(i, schema_, log_anchor_registry_.get(),
+                                    mem_trackers_.tablet_tracker, &mrs));
 
-        for (int n = 0; n < FLAGS_merge_benchmark_num_rows_per_rowset; n++) {
-
-          int row_key;
+        for (uint32_t n = 0; n < num_rows_per_rowset; ++n) {
+          int64_t row_key;
           if (OVERLAP_INPUTS) {
             // input 0: 0 3 6 9 ...
             // input 1: 1 4 7 10 ...
             // input 2: 2 5 8 11 ...
-            row_key = n * FLAGS_merge_benchmark_num_rowsets + i;
+            row_key = n * num_rowsets + i;
           } else {
             // input 0: 0 1 2 3
             // input 1: 1000 1001 1002 1003
             // ...
-            row_key = i * FLAGS_merge_benchmark_num_rows_per_rowset + n;
+            row_key = i * num_rows_per_rowset + n;
           }
           InsertRow(mrs.get(), row_key, n);
         }
         shared_ptr<DiskRowSet> rs;
-        FlushMRSAndReopenNoRoll(*mrs, schema_, &rs);
-        NO_FATALS();
-        rowsets.push_back(rs);
+        ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
+        rowsets.emplace_back(std::move(rs));
       }
     } else {
       string tablet_id = "KuduCompactionBenchTablet";
@@ -495,15 +513,19 @@ class TestCompaction : public KuduRowSetTest {
       scoped_refptr<TabletMetadata> input_meta;
       ASSERT_OK(TabletMetadata::Load(&fs_manager, tablet_id, &input_meta));
 
-      for (const auto& meta : input_meta->rowsets()) {
+      const auto& input_rowsets = input_meta->rowsets();
+      rowsets.reserve(input_rowsets.size());
+      for (const auto& meta : input_rowsets) {
         shared_ptr<DiskRowSet> rs;
-        CHECK_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
+        ASSERT_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(),
                                   mem_trackers_, nullptr, &rs));
-        rowsets.push_back(rs);
+        rowsets.emplace_back(std::move(rs));
       }
 
-      CHECK(!rowsets.empty()) << "No rowsets found in " << FLAGS_merge_benchmark_input_dir;
+      ASSERT_TRUE(!rowsets.empty())
+          << "No rowsets found in " << FLAGS_merge_benchmark_input_dir;
     }
+
     LOG(INFO) << "Beginning compaction";
     LOG_TIMING(INFO, "compacting " +
                std::string((OVERLAP_INPUTS ? "with overlap" : "without overlap"))) {
@@ -525,9 +547,9 @@ class TestCompaction : public KuduRowSetTest {
 
   // Helpers for building an expected row history.
   void AddExpectedDelete(Mutation** current_head, Timestamp ts = Timestamp::kInvalidTimestamp);
-  void AddExpectedUpdate(Mutation** current_head, int32_t val);
-  void AddExpectedReinsert(Mutation** current_head, int32_t val);
-  void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int row_id, int32_t val);
+  void AddExpectedUpdate(Mutation** current_head, int64_t val);
+  void AddExpectedReinsert(Mutation** current_head, int64_t val);
+  void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int64_t row_id, int64_t val);
 
  protected:
   OpId op_id_;
@@ -559,13 +581,13 @@ TEST_F(TestCompaction, TestMemRowSetInput) {
   unique_ptr<CompactionInput> input(CompactionInput::Create(*mrs, &schema_, snap));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=0, )"
-                "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; Redo Mutations: "
-                "[@11(SET val=1, nullable_val=1), @21(SET val=2, nullable_val=NULL)];",
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 val=0, "
+            "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; Redo Mutations: "
+            "[@11(SET val=1, nullable_val=1), @21(SET val=2, nullable_val=NULL)];",
             out[0]);
-  EXPECT_EQ(R"(RowIdxInBlock: 9; Base: (string key="hello 00000090", int32 val=9, )"
-                "int32 nullable_val=NULL); Undo Mutations: [@10(DELETE)]; Redo Mutations: "
-                "[@20(SET val=1, nullable_val=1), @30(SET val=2, nullable_val=NULL)];",
+  EXPECT_EQ("RowIdxInBlock: 9; Base: (string key=\"hello 0000000090\", int64 val=9, "
+            "int32 nullable_val=NULL); Undo Mutations: [@10(DELETE)]; Redo Mutations: "
+            "[@20(SET val=1, nullable_val=1), @30(SET val=2, nullable_val=NULL)];",
             out[9]);
 }
 
@@ -578,22 +600,22 @@ TEST_F(TestCompaction, TestFlushMRSWithRolling) {
   InsertRows(mrs.get(), 30000, 0);
 
   vector<shared_ptr<DiskRowSet>> rowsets;
-  FlushMRSAndReopen(*mrs, schema_, kSmallRollThreshold, &rowsets);
+  ASSERT_OK(FlushMRSAndReopen(*mrs, schema_, kSmallRollThreshold, &rowsets));
   ASSERT_GT(rowsets.size(), 1);
 
   vector<string> rows;
   rows.reserve(30000 / 2);
   rowsets[0]->DebugDump(&rows);
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=0, )"
-                "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; Redo Mutations: [];",
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 val=0, "
+            "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; Redo Mutations: [];",
             rows[0]);
 
   rows.clear();
   rowsets[1]->DebugDump(&rows);
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00017150", int32 val=1715, )"
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000017150\", int64 val=1715, "
             "int32 nullable_val=NULL); Undo Mutations: [@1716(DELETE)]; Redo Mutations: [];",
             rows[0]);
-  EXPECT_EQ(R"(RowIdxInBlock: 1; Base: (string key="hello 00017160", int32 val=1716, )"
+  EXPECT_EQ("RowIdxInBlock: 1; Base: (string key=\"hello 0000017160\", int64 val=1716, "
             "int32 nullable_val=1716); Undo Mutations: [@1717(DELETE)]; Redo Mutations: [];",
             rows[1]);
 }
@@ -606,8 +628,7 @@ TEST_F(TestCompaction, TestRowSetInput) {
     ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
                                 mem_trackers_.tablet_tracker, &mrs));
     InsertRows(mrs.get(), 10, 0);
-    FlushMRSAndReopenNoRoll(*mrs, schema_, &rs);
-    NO_FATALS();
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
   }
 
   // Update the rows in the rowset.
@@ -624,15 +645,15 @@ TEST_F(TestCompaction, TestRowSetInput) {
   ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=0, )"
-                "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; Redo Mutations: "
-                "[@11(SET val=1, nullable_val=1), @21(SET val=2, nullable_val=NULL), "
-                "@31(SET val=3, nullable_val=3), @41(SET val=4, nullable_val=NULL)];",
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 val=0, "
+            "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; Redo Mutations: "
+            "[@11(SET val=1, nullable_val=1), @21(SET val=2, nullable_val=NULL), "
+            "@31(SET val=3, nullable_val=3), @41(SET val=4, nullable_val=NULL)];",
             out[0]);
-  EXPECT_EQ(R"(RowIdxInBlock: 9; Base: (string key="hello 00000090", int32 val=9, )"
-                "int32 nullable_val=NULL); Undo Mutations: [@10(DELETE)]; Redo Mutations: "
-                "[@20(SET val=1, nullable_val=1), @30(SET val=2, nullable_val=NULL), "
-                "@40(SET val=3, nullable_val=3), @50(SET val=4, nullable_val=NULL)];",
+  EXPECT_EQ("RowIdxInBlock: 9; Base: (string key=\"hello 0000000090\", int64 val=9, "
+            "int32 nullable_val=NULL); Undo Mutations: [@10(DELETE)]; Redo Mutations: "
+            "[@20(SET val=1, nullable_val=1), @30(SET val=2, nullable_val=NULL), "
+            "@40(SET val=3, nullable_val=3), @50(SET val=4, nullable_val=NULL)];",
             out[9]);
 }
 
@@ -646,8 +667,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
     ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
                                 mem_trackers_.tablet_tracker, &mrs));
     InsertRows(mrs.get(), 10, 0);
-    FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1);
-    NO_FATALS();
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1));
   }
   // Now delete the rows, this will make the rs report them as deleted and
   // so we would reinsert them into the MRS.
@@ -660,8 +680,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
                                 mem_trackers_.tablet_tracker, &mrs));
     InsertRows(mrs.get(), 10, 0);
     UpdateRows(mrs.get(), 10, 0, 1);
-    FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2);
-    NO_FATALS();
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2));
   }
   DeleteRows(rs2.get(), 10);
 
@@ -672,21 +691,17 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
                                 mem_trackers_.tablet_tracker, &mrs));
     InsertRows(mrs.get(), 10, 0);
     UpdateRows(mrs.get(), 10, 0, 2);
-    FlushMRSAndReopenNoRoll(*mrs, schema_, &rs3);
-    NO_FATALS();
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs3));
   }
 
-  shared_ptr<DiskRowSet> result;
-  vector<shared_ptr<DiskRowSet>> all_rss;
-  all_rss.push_back(rs3);
-  all_rss.push_back(rs1);
-  all_rss.push_back(rs2);
+  vector<shared_ptr<DiskRowSet>> all_rss{ rs3, rs1, rs2 };
 
   // Shuffle the row sets to make sure we test different orderings
   std::mt19937 gen(SeedRandom());
   std::shuffle(all_rss.begin(), all_rss.end(), gen);
 
   // Now compact all the drs and make sure we don't get duplicated keys on the output
+  shared_ptr<DiskRowSet> result;
   CompactAndReopenNoRoll(all_rss, schema_, &result);
 
   unique_ptr<CompactionInput> input;
@@ -698,16 +713,16 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
   vector<string> out;
   IterateInput(input.get(), &out);
   ASSERT_EQ(out.size(), 10);
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=2, )"
-                "int32 nullable_val=NULL); Undo Mutations: [@61(SET val=0, nullable_val=0), "
-                "@51(DELETE), @41(REINSERT val=1, nullable_val=1), @31(SET val=0, nullable_val=0), "
-                "@21(DELETE), @11(REINSERT val=0, nullable_val=0), @1(DELETE)]; "
-                "Redo Mutations: [];", out[0]);
-  EXPECT_EQ(R"(RowIdxInBlock: 9; Base: (string key="hello 00000090", int32 val=2, )"
-                "int32 nullable_val=NULL); Undo Mutations: [@70(SET val=9, nullable_val=NULL), "
-                "@60(DELETE), @50(REINSERT val=1, nullable_val=1), @40(SET val=9, "
-                "nullable_val=NULL), @30(DELETE), @20(REINSERT val=9, nullable_val=NULL), "
-                "@10(DELETE)]; Redo Mutations: [];", out[9]);
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 val=2, "
+            "int32 nullable_val=NULL); Undo Mutations: [@61(SET val=0, nullable_val=0), "
+            "@51(DELETE), @41(REINSERT val=1, nullable_val=1), @31(SET val=0, nullable_val=0), "
+            "@21(DELETE), @11(REINSERT val=0, nullable_val=0), @1(DELETE)]; "
+            "Redo Mutations: [];", out[0]);
+  EXPECT_EQ("RowIdxInBlock: 9; Base: (string key=\"hello 0000000090\", int64 val=2, "
+            "int32 nullable_val=NULL); Undo Mutations: [@70(SET val=9, nullable_val=NULL), "
+            "@60(DELETE), @50(REINSERT val=1, nullable_val=1), @40(SET val=9, "
+            "nullable_val=NULL), @30(DELETE), @20(REINSERT val=9, nullable_val=NULL), "
+            "@10(DELETE)]; Redo Mutations: [];", out[9]);
 }
 
 void TestCompaction::AddExpectedDelete(Mutation** current_head, Timestamp ts) {
@@ -722,7 +737,7 @@ void TestCompaction::AddExpectedDelete(Mutation** current_head, Timestamp ts) {
   *current_head = mutation;
 }
 
-void TestCompaction::AddExpectedUpdate(Mutation** current_head, int32_t val) {
+void TestCompaction::AddExpectedUpdate(Mutation** current_head, int64_t val) {
   faststring buf;
   RowChangeListEncoder enc(&buf);
   enc.SetToUpdate();
@@ -739,7 +754,7 @@ void TestCompaction::AddExpectedUpdate(Mutation** current_head, int32_t val) {
   *current_head = mutation;
 }
 
-void TestCompaction::AddExpectedReinsert(Mutation** current_head, int32_t val) {
+void TestCompaction::AddExpectedReinsert(Mutation** current_head, int64_t val) {
   faststring buf;
   RowChangeListEncoder enc(&buf);
   enc.SetToReinsert();
@@ -756,8 +771,8 @@ void TestCompaction::AddExpectedReinsert(Mutation** current_head, int32_t val) {
   *current_head = mutation;
 }
 
-void TestCompaction::AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int row_id,
-                                        int32_t val) {
+void TestCompaction::AddUpdateAndDelete(
+    RowSet* rs, CompactionInputRow* row, int64_t row_id, int64_t val) {
   UpdateRow(rs, row_id, val);
   // Expect an UNDO update for the update.
   AddExpectedUpdate(&row->undo_head, row_id);
@@ -794,7 +809,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
   vector<shared_ptr<DiskRowSet>> row_sets;
 
   // Create a vector of ids for rows and fill it for the first layer.
-  vector<int> row_ids(total_num_rows);
+  vector<int64_t> row_ids(total_num_rows);
   std::iota(row_ids.begin(), row_ids.end(), 0);
 
   SeedRandom();
@@ -809,23 +824,27 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
 
       // For even rows, insert, update and delete them in the mrs.
       for (int k = 0; k < kNumRowsPerRowSet; ++k) {
-        int row_id = row_ids[row_idx + k];
+        int64_t row_id = row_ids[row_idx + k];
         CompactionInputRow* row = &expected_rows[row_id];
         InsertRow(mrs.get(), row_id, row_id);
         // Expect an UNDO delete for the insert.
         AddExpectedDelete(&row->undo_head);
-        if (row_id % 2 == 0) AddUpdateAndDelete(mrs.get(), row, row_id, row_id + i + 1);
+        if (row_id % 2 == 0) {
+          AddUpdateAndDelete(mrs.get(), row, row_id, row_id + i + 1);
+        }
       }
       shared_ptr<DiskRowSet> drs;
-      FlushMRSAndReopenNoRoll(*mrs, schema_, &drs);
+      ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &drs));
       // For odd rows, update them and delete them in the drs.
       for (int k = 0; k < kNumRowsPerRowSet; ++k) {
-        int row_id = row_ids[row_idx];
+        int64_t row_id = row_ids[row_idx];
         CompactionInputRow* row = &expected_rows[row_id];
-        if (row_id % 2 == 1) AddUpdateAndDelete(drs.get(), row, row_id, row_id + i + 1);
+        if (row_id % 2 == 1) {
+          AddUpdateAndDelete(drs.get(), row, row_id, row_id + i + 1);
+        }
         row_idx++;
       }
-      row_sets.push_back(drs);
+      row_sets.emplace_back(std::move(drs));
       rs_id++;
     }
     // For the next layer remove one rowset worth of rows at random.
@@ -841,7 +860,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
   // Go through the expected compaction input rows, flip the last undo into a redo and
   // build the base. This will give us the final version that we'll expect the result
   // of the real compaction to match.
-  for (int i = 0; i < expected_rows.size(); ++i) {
+  for (size_t i = 0; i < expected_rows.size(); ++i) {
     CompactionInputRow* row = &expected_rows[i];
     Mutation* reinsert = row->undo_head;
     row->undo_head = reinsert->next();
@@ -849,7 +868,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
     BuildRow(i, i);
     CopyRow(row_builder_.row(), &row->row, &mem.arena);
     RowChangeListDecoder redo_decoder(reinsert->changelist());
-    CHECK_OK(redo_decoder.Init());
+    ASSERT_OK(redo_decoder.Init());
     faststring buf;
     RowChangeListEncoder dummy(&buf);
     dummy.SetToUpdate();
@@ -860,35 +879,36 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
   vector<shared_ptr<CompactionInput>> inputs;
   for (const auto& row_set : row_sets) {
     unique_ptr<CompactionInput> ci;
-    CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
-    inputs.push_back(shared_ptr<CompactionInput>(ci.release()));
+    ASSERT_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
+    inputs.emplace_back(ci.release());
   }
 
   // Compact the row sets by picking a few at random until we're left with just one.
+  std::mt19937 gen(SeedRandom());
   while (row_sets.size() > 1) {
-    std::mt19937 gen(SeedRandom());
     std::shuffle(row_sets.begin(), row_sets.end(), gen);
     // Merge between 2 and 4 row sets.
-    int num_rowsets_to_merge = std::min(rand() % 3 + 2, static_cast<int>(row_sets.size()));
+    const size_t num_rowsets_to_merge =
+        std::min(static_cast<size_t>(rand()) % 3 + 2, row_sets.size());
     vector<shared_ptr<DiskRowSet>> to_merge;
-    for (int i = 0; i < num_rowsets_to_merge; ++i) {
-      to_merge.push_back(row_sets.back());
+    for (size_t i = 0; i < num_rowsets_to_merge; ++i) {
+      to_merge.emplace_back(std::move(row_sets.back()));
       row_sets.pop_back();
     }
     shared_ptr<DiskRowSet> result;
     CompactAndReopenNoRoll(to_merge, schema_, &result);
-    row_sets.push_back(result);
+    row_sets.emplace_back(std::move(result));
   }
 
   vector<string> out;
   unique_ptr<CompactionInput> ci;
-  CHECK_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
+  ASSERT_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
   IterateInput(ci.get(), &out);
 
   // Finally go through the final compaction input and through the expected one and make sure
   // they match.
   ASSERT_EQ(expected_rows.size(), out.size());
-  for (int i = 0; i < expected_rows.size(); ++i) {
+  for (size_t i = 0; i < expected_rows.size(); ++i) {
     EXPECT_EQ(CompactionInputRowToString(expected_rows[i]), out[i]);
   }
 }
@@ -903,8 +923,7 @@ TEST_F(TestCompaction, TestMRSCompactionDoesntOutputUnobservableRows) {
     ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
                                 mem_trackers_.tablet_tracker, &mrs));
     InsertRow(mrs.get(), 1, 1);
-    FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1);
-    NO_FATALS();
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1));
   }
 
   // Now make the row a ghost in rs1 in the same op as we reinsert it in the mrs then
@@ -924,21 +943,22 @@ TEST_F(TestCompaction, TestMRSCompactionDoesntOutputUnobservableRows) {
 
     InsertRowInOp(mrs.get(), op, 2, 0);
     op.FinishApplying();
-    FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2);
-    NO_FATALS();
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2));
   }
 
   MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllOps();
 
-  unique_ptr<CompactionInput> rs1_input;
-  ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, nullptr, &rs1_input));
+  vector<shared_ptr<CompactionInput>> to_merge;
+  {
+    unique_ptr<CompactionInput> rs1_input;
+    ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, nullptr, &rs1_input));
 
-  unique_ptr<CompactionInput> rs2_input;
-  ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, nullptr, &rs2_input));
+    unique_ptr<CompactionInput> rs2_input;
+    ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, nullptr, &rs2_input));
 
-  vector<shared_ptr<CompactionInput>> to_merge;
-  to_merge.push_back(shared_ptr<CompactionInput>(rs1_input.release()));
-  to_merge.push_back(shared_ptr<CompactionInput>(rs2_input.release()));
+    to_merge.emplace_back(rs1_input.release());
+    to_merge.emplace_back(rs2_input.release());
+  }
 
   unique_ptr<CompactionInput> merged(CompactionInput::Merge(to_merge, &schema_));
 
@@ -947,11 +967,11 @@ TEST_F(TestCompaction, TestMRSCompactionDoesntOutputUnobservableRows) {
   vector<string> out;
   IterateInput(merged.get(), &out);
   EXPECT_EQ(out.size(), 2);
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000001", int32 val=1, )"
-                "int32 nullable_val=NULL); Undo Mutations: [@1(DELETE)]; Redo Mutations: "
-                "[@2(DELETE)];", out[0]);
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000002", int32 val=0, )"
-                "int32 nullable_val=0); Undo Mutations: [@2(DELETE)]; Redo Mutations: [];", out[1]);
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000001\", int64 val=1, "
+            "int32 nullable_val=NULL); Undo Mutations: [@1(DELETE)]; Redo Mutations: "
+            "[@2(DELETE)];", out[0]);
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000002\", int64 val=0, "
+            "int32 nullable_val=0); Undo Mutations: [@2(DELETE)]; Redo Mutations: [];", out[1]);
 }
 
 // Test case which doesn't do any merging -- just compacts
@@ -968,8 +988,7 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   // Flush it to disk and re-open.
   shared_ptr<DiskRowSet> rs;
-  FlushMRSAndReopenNoRoll(*mrs, schema_, &rs);
-  NO_FATALS();
+  ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
 
   // Update the rows with some updates that weren't in the snapshot.
   UpdateRows(mrs.get(), 1000, 0, 2);
@@ -991,16 +1010,16 @@ TEST_F(TestCompaction, TestOneToOne) {
   ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(1000, out.size());
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 val=1, )"
-                "int32 nullable_val=1); Undo Mutations: [@1001(SET val=0, nullable_val=0), "
-                "@1(DELETE)]; Redo Mutations: [@2001(SET val=2, nullable_val=NULL), "
-                "@3001(SET val=3, nullable_val=3)];", out[0]);
+  EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 val=1, "
+            "int32 nullable_val=1); Undo Mutations: [@1001(SET val=0, nullable_val=0), "
+            "@1(DELETE)]; Redo Mutations: [@2001(SET val=2, nullable_val=NULL), "
+            "@3001(SET val=3, nullable_val=3)];", out[0]);
 
   // And compact (1 input to 1 output)
   MvccSnapshot snap3(mvcc_);
   unique_ptr<CompactionInput> compact_input;
   ASSERT_OK(CompactionInput::Create(*rs, &schema_, snap3, nullptr, &compact_input));
-  DoFlushAndReopen(compact_input.get(), schema_, snap3, kLargeRollThreshold, nullptr);
+  ASSERT_OK(DoFlushAndReopen(compact_input.get(), schema_, snap3, kLargeRollThreshold, nullptr));
 }
 
 // Test merging two row sets and the second one has updates, KUDU-102
@@ -1013,8 +1032,7 @@ TEST_F(TestCompaction, TestKUDU102) {
                               mem_trackers_.tablet_tracker, &mrs));
   InsertRows(mrs.get(), 10, 0);
   shared_ptr<DiskRowSet> rs;
-  FlushMRSAndReopenNoRoll(*mrs, schema_, &rs);
-  NO_FATALS();
+  ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
 
   shared_ptr<MemRowSet> mrs_b;
   ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
@@ -1022,8 +1040,7 @@ TEST_F(TestCompaction, TestKUDU102) {
   InsertRows(mrs_b.get(), 10, 100);
   MvccSnapshot snap(mvcc_);
   shared_ptr<DiskRowSet> rs_b;
-  FlushMRSAndReopenNoRoll(*mrs_b, schema_, &rs_b);
-  NO_FATALS();
+  ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs_b, schema_, &rs_b));
 
   // Update all the rows in the second row set
   UpdateRows(mrs_b.get(), 10, 100, 2);
@@ -1032,10 +1049,8 @@ TEST_F(TestCompaction, TestKUDU102) {
   // Note that we are merging two MRS, it's a hack
   MvccSnapshot snap2(mvcc_);
   vector<shared_ptr<CompactionInput>> merge_inputs;
-  merge_inputs.push_back(
-        shared_ptr<CompactionInput>(CompactionInput::Create(*mrs, &schema_, snap2)));
-  merge_inputs.push_back(
-        shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap2)));
+  merge_inputs.emplace_back(CompactionInput::Create(*mrs, &schema_, snap2));
+  merge_inputs.emplace_back(CompactionInput::Create(*mrs_b, &schema_, snap2));
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
 
   string dummy_name = "";
@@ -1047,30 +1062,27 @@ TEST_F(TestCompaction, TestKUDU102) {
 
 // Test compacting when all of the inputs and the output have the same schema
 TEST_F(TestCompaction, TestMerge) {
-  vector<Schema> schemas;
-  schemas.push_back(schema_);
-  schemas.push_back(schema_);
-  schemas.push_back(schema_);
-  DoMerge(schemas.back(), schemas);
+  vector<Schema> schemas{ schema_, schema_, schema_ };
+  NO_FATALS(DoMerge(schemas.back(), schemas));
 }
 
 // test compacting when the inputs have different base schemas
 TEST_F(TestCompaction, TestMergeMultipleSchemas) {
-  vector<Schema> schemas;
   SchemaBuilder builder(schema_);
-  schemas.push_back(schema_);
+  vector<Schema> schemas;
+  schemas.emplace_back(schema_);
 
   // Add an int column with default
   int32_t default_c2 = 10;
-  CHECK_OK(builder.AddColumn("c2", INT32, false, &default_c2, &default_c2));
-  schemas.push_back(builder.Build());
+  ASSERT_OK(builder.AddColumn("c2", INT32, false, &default_c2, &default_c2));
+  schemas.emplace_back(builder.Build());
 
   // add a string column with default
   Slice default_c3("Hello World");
-  CHECK_OK(builder.AddColumn("c3", STRING, false, &default_c3, &default_c3));
-  schemas.push_back(builder.Build());
+  ASSERT_OK(builder.AddColumn("c3", STRING, false, &default_c3, &default_c3));
+  schemas.emplace_back(builder.Build());
 
-  DoMerge(schemas.back(), schemas);
+  NO_FATALS(DoMerge(schemas.back(), schemas));
 }
 
 // Test MergeCompactionInput against MemRowSets.
@@ -1100,9 +1112,8 @@ TEST_F(TestCompaction, TestMergeMRS) {
   };
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
-  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
-  uint64_t total_num_rows = CountRows(result_rs);
-  ASSERT_EQ(20, total_num_rows);
+  ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs));
+  ASSERT_EQ(20, CountRows(result_rs));
 }
 
 // Test MergeCompactionInput against MemRowSets, where there are rows that were
@@ -1120,7 +1131,7 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) {
   };
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
-  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs));
   ASSERT_EQ(1, result_rs.size());
   ASSERT_EQ(10, CountRows(result_rs));
 }
@@ -1200,7 +1211,7 @@ TEST_F(TestCompaction, TestRandomizeDuplicatedRowsAcrossTransactions) {
   }
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
-  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs));
   ASSERT_EQ(1, result_rs.size());
   ASSERT_EQ(mrs_with_live_row ? 1 : 0, CountRows(result_rs));
 }
@@ -1246,7 +1257,7 @@ TEST_F(TestCompaction, TestRowHistoryJumpsBetweenRowsets) {
   };
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
-  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs));
   ASSERT_EQ(1, result_rs.size());
   ASSERT_EQ(0, CountRows(result_rs));
 }
@@ -1262,7 +1273,7 @@ TEST_F(TestCompaction, TestMergeMRSWithAllInvisibleRows) {
   };
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
-  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs));
   ASSERT_TRUE(result_rs.empty());
 }
 
@@ -1302,7 +1313,7 @@ TEST_F(TestCompaction, TestConcurrentCompactionRowSetPicking) {
     for (int j = 0; j < kNumRowsPerRowSet; j++) {
       const int val = i + j * 10;
       ASSERT_OK(row.SetStringCopy("key", Substitute("hello $0", val)));
-      ASSERT_OK(row.SetInt32("val", val));
+      ASSERT_OK(row.SetInt64("val", val));
       ASSERT_OK(writer.Insert(row));
     }
     ASSERT_OK(tablet()->Flush());
@@ -1339,7 +1350,7 @@ TEST_F(TestCompaction, TestCompactionFreesDiskSpace) {
       for (int j = 0; j < 10; j++) {
         int val = (i * 10) + j;
         ASSERT_OK(row.SetStringCopy("key", Substitute("hello $0", val)));
-        ASSERT_OK(row.SetInt32("val", val));
+        ASSERT_OK(row.SetInt64("val", val));
         ASSERT_OK(writer.Insert(row));
       }
       ASSERT_OK(tablet()->Flush());
@@ -1376,7 +1387,7 @@ TEST_F(TestCompaction, TestCompactionFreesDiskSpace) {
 TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
   if (FLAGS_block_manager != "log") {
     LOG(WARNING) << "Test requires the log block manager";
-    return;
+    GTEST_SKIP();
   }
 
   // Fetch the metric for the number of on-disk blocks, so we can later verify
@@ -1411,7 +1422,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfMemRowSetFlush) {
   ASSERT_EQ(100 - 50 + 10, count);
 
   shared_ptr<DiskRowSet> rs;
-  NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
+  ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
   ASSERT_OK(rs->CountLiveRows(&count));
   ASSERT_EQ(100 - 50 + 10, count);
 }
@@ -1426,7 +1437,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfDiskRowSetsCompact) {
     NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
     NO_FATALS(DeleteRows(mrs.get(), 50, 0));
     NO_FATALS(InsertRows(mrs.get(), 10, 0));
-    NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1));
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1));
   }
   shared_ptr<DiskRowSet> rs2;
   {
@@ -1437,7 +1448,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfDiskRowSetsCompact) {
     NO_FATALS(UpdateRows(mrs.get(), 80, 1, 1));
     NO_FATALS(DeleteRows(mrs.get(), 50, 1));
     NO_FATALS(InsertRows(mrs.get(), 10, 1));
-    NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2));
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2));
   }
   shared_ptr<DiskRowSet> rs3;
   {
@@ -1448,7 +1459,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfDiskRowSetsCompact) {
     NO_FATALS(UpdateRows(mrs.get(), 80, 2, 2));
     NO_FATALS(DeleteRows(mrs.get(), 50, 2));
     NO_FATALS(InsertRows(mrs.get(), 10, 2));
-    NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs3));
+    ASSERT_OK(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs3));
   }
 
   shared_ptr<DiskRowSet> result;