You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/08/21 17:59:31 UTC

[kudu] branch master updated (413396c -> b9c429c)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 413396c  [docs] Fix an outdated slow DNS log description
     new 0c9204c  [mini_hms] remove unused duplicate from env_vars
     new b9c429c  [tablet] Fixed the bug of DeltaTracker::CountDeletedRows

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/hms/mini_hms.cc                           |   1 -
 src/kudu/integration-tests/raft_consensus-itest.cc |  36 ++++-
 src/kudu/integration-tests/test_workload.cc        | 153 ++++++++++++---------
 src/kudu/integration-tests/test_workload.h         |  20 +++
 src/kudu/tablet/delta_tracker.cc                   |  15 +-
 src/kudu/tablet/delta_tracker.h                    |   2 +-
 src/kudu/tablet/diskrowset.cc                      |   2 +-
 src/kudu/tablet/metadata-test.cc                   |   2 +-
 src/kudu/tablet/mt-tablet-test.cc                  |   9 +-
 src/kudu/tablet/rowset_metadata.cc                 |  10 +-
 src/kudu/tablet/rowset_metadata.h                  |   8 +-
 11 files changed, 177 insertions(+), 81 deletions(-)


[kudu] 01/02: [mini_hms] remove unused duplicate from env_vars

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0c9204c41cad73e64ef5427c67034f0a6a2265e9
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Aug 20 14:45:00 2019 -0700

    [mini_hms] remove unused duplicate from env_vars
    
    Once the code was changed in 76b80ec01, the duplicate assignment
    for the "HADOOP_CONF_DIR" environment variable was not effectively
    put into the 'env_vars' map.
    
    This patch does not contain any functional modifications.
    
    Change-Id: I3a7fb897aac5671a6938b73c66aac2d5d93fe8f7
    Reviewed-on: http://gerrit.cloudera.org:8080/14109
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/hms/mini_hms.cc | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index 98db795..1174229 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -153,7 +153,6 @@ Status MiniHms::Start() {
       { "HIVE_AUX_JARS_PATH", aux_jars },
       { "HIVE_CONF_DIR", data_root_ },
       { "JAVA_TOOL_OPTIONS", java_options },
-      { "HADOOP_CONF_DIR", data_root_ },
       // Set HADOOP_OS_TYPE=Linux due to HADOOP-8719.
       // TODO(ghenke): Remove after HADOOP-15966 is available (Hadoop 3.1.3+)
       { "HADOOP_OS_TYPE", "Linux" }


[kudu] 02/02: [tablet] Fixed the bug of DeltaTracker::CountDeletedRows

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b9c429c6831cf7aa948490136eeedc984cf7e4c1
Author: oclarms <oc...@gmail.com>
AuthorDate: Wed Aug 14 11:43:51 2019 +0800

    [tablet] Fixed the bug of DeltaTracker::CountDeletedRows
    
    When Tablet.CountLiveRows was called in a multi-thread case, there's a
    chance we'll see the following failure.
    
    User stack:
    F0814 12:05:51.975797 96375 diskrowset.cc:759] Check failed: *count >= 0 (-3 vs. 0)
    *** Check failure stack trace: ***
    *** Aborted at 1565755551 (unix time) try "date -d @1565755551" if you are using GNU date ***
    PC: @     0x7f9bd20425f7 __GI_raise
    *** SIGABRT (@0x70900017872) received by PID 96370 (TID 0x7f9bce2d7700) from PID 96370; stack trace: ***
        @     0x7f9bdaff6100 (unknown)
        @     0x7f9bd20425f7 __GI_raise
        @     0x7f9bd2043ce8 __GI_abort
        @     0x7f9bd4540c99 google::logging_fail()
        @     0x7f9bd454246d google::LogMessage::Fail()
        @     0x7f9bd45443c3 google::LogMessage::SendToLog()
        @     0x7f9bd4541fc9 google::LogMessage::Flush()
        @     0x7f9bd4544d4f google::LogMessageFatal::~LogMessageFatal()
        @     0x7f9bddc9aabe kudu::tablet::DiskRowSet::CountLiveRows()
        @     0x7f9bddbdeb79 kudu::tablet::Tablet::CountLiveRows()
        @           0x49891f kudu::tablet::MultiThreadedTabletTest<>::CollectStatisticsThread()
        @           0x4ae34b boost::_mfi::mf1<>::operator()()
        @           0x4add25 boost::_bi::list2<>::operator()<>()
        @           0x4acfe9 boost::_bi::bind_t<>::operator()()
        @           0x4ac8a6 boost::detail::function::void_function_obj_invoker0<>::invoke()
        @     0x7f9bd7116492 boost::function0<>::operator()()
        @     0x7f9bd62e5324 kudu::Thread::SuperviseThread()
        @     0x7f9bdafeedc5 start_thread
        @     0x7f9bd2103ced __clone
    
    This is because there is DeltaTracker lack of lock protection when modify
    the number of live rows in rowset_metadata_ and reset the deleted_row_count_.
    This caused deleted_row_count_ to be duplicated when calculating the number
    of live rows of DRS. Consider the following sequence:
    | T1                                | T2
    |----------                         |----------
    |+ In DT::Flush                     |
    |  Take compact_flush_lock_ (excl)  |
    |  Take component_lock_ (excl)      |
    |  deleted_row_count_ = ...         |
    |  Release component_lock_          |
    |  + In DT::FlushDMS                |
    |    Call RSMD::IncrementLiveRows   |
    |    --> RSMD::live_row_count - deleted_row_count_
    |                                   |+ In DRS::CountLiveRows
    |                                   |  Take component_lock_ (shared)
    |                                   |  Call RSMD::live_row_count - DT::CountDeletedRows
    |                                   |  --> RSMD::live_row_count - deleted_row_count_
    |                                   |  --> we double counted deleted_row_count_ !!!
    |  Take component_lock_ (excl)      |
    |  deleted_row_count_ = 0           |
    |  Release component_lock_          |
    |  Release compact_flush_lock_      |
    
    Change-Id: I9bb4456123087778c9dc799777c5990938a84fdf
    Reviewed-on: http://gerrit.cloudera.org:8080/14061
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/integration-tests/raft_consensus-itest.cc |  36 ++++-
 src/kudu/integration-tests/test_workload.cc        | 153 ++++++++++++---------
 src/kudu/integration-tests/test_workload.h         |  20 +++
 src/kudu/tablet/delta_tracker.cc                   |  15 +-
 src/kudu/tablet/delta_tracker.h                    |   2 +-
 src/kudu/tablet/diskrowset.cc                      |   2 +-
 src/kudu/tablet/metadata-test.cc                   |   2 +-
 src/kudu/tablet/mt-tablet-test.cc                  |   9 +-
 src/kudu/tablet/rowset_metadata.cc                 |  10 +-
 src/kudu/tablet/rowset_metadata.h                  |   8 +-
 10 files changed, 177 insertions(+), 80 deletions(-)

diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 4e77708..0c2c114 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -220,7 +220,7 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers,
                                                    const string& leader_uuid);
 
-  void CreateClusterForCrashyNodesTests();
+  void CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags = {});
   void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert);
 
   // Prepare for a test where a single replica of a 3-server cluster is left
@@ -528,7 +528,7 @@ void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
                                MonoDelta::FromSeconds(10)));
 }
 
-void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
+void RaftConsensusITest::CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags) {
   if (AllowSlowTests()) {
     FLAGS_num_tablet_servers = 7;
     FLAGS_num_replicas = 7;
@@ -555,6 +555,8 @@ void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
   // log area.
   ts_flags.emplace_back("--log_preallocate_segments=false");
 
+  ts_flags.insert(ts_flags.end(), extra_ts_flags.begin(), extra_ts_flags.end());
+
   NO_FATALS(CreateCluster("raft_consensus-itest-crashy-nodes-cluster",
                           std::move(ts_flags)));
 }
@@ -608,7 +610,7 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_
   NO_FATALS(v.CheckCluster());
   NO_FATALS(v.CheckRowCount(workload->table_name(),
                             ClusterVerifier::EXACTLY,
-                            workload->rows_inserted()));
+                            workload->rows_inserted() - workload->rows_deleted()));
 }
 
 void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
@@ -952,6 +954,34 @@ TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) {
   NO_FATALS(DoTestCrashyNodes(&workload, 300));
 }
 
+// The same crashy nodes test as above but the keys will be deleted after insertion.
+TEST_F(RaftConsensusITest, InsertAndDeleteWithCrashyNodes) {
+  vector<string> extra_ts_flags = {
+      "--flush_threshold_mb=0",
+      "--flush_threshold_secs=1",
+      "--maintenance_manager_polling_interval_ms=10",
+      "--heartbeat_interval_ms=10",
+      "--update_tablet_stats_interval_ms=10",
+  };
+  NO_FATALS(CreateClusterForCrashyNodesTests(std::move(extra_ts_flags)));
+
+  // If the AllowSlowTests is true, test the scenario that deleting data on DRS.
+  // Otherwise, test deleting data on MRS.
+  int32_t write_interval_millis = 0;
+  int32_t write_batch_size = 5;
+  if (AllowSlowTests()) {
+    // Wait for MRS to be flushed.
+    write_interval_millis = 1000;
+    // Decrease the number of rows per batch to generate more DRSs.
+    write_batch_size = 1;
+  }
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS_WITH_DELETE);
+  workload.set_write_interval_millis(write_interval_millis);
+  workload.set_write_batch_size(write_batch_size);
+  NO_FATALS(DoTestCrashyNodes(&workload, 100));
+}
 
 TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
   int kNumElections = FLAGS_num_replicas;
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 7b9863e..eb2bb70 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/integration-tests/test_workload.h"
 
-#include <cstddef>
 #include <memory>
 #include <ostream>
 
@@ -43,6 +42,8 @@ namespace kudu {
 
 using client::KuduClient;
 using client::KuduColumnSchema;
+using client::KuduDelete;
+using client::KuduError;
 using client::KuduInsert;
 using client::KuduScanBatch;
 using client::KuduScanner;
@@ -54,6 +55,8 @@ using client::KuduUpdate;
 using client::sp::shared_ptr;
 using cluster::MiniCluster;
 
+using std::vector;
+
 const char* const TestWorkload::kDefaultTableName = "test-workload";
 
 TestWorkload::TestWorkload(MiniCluster* cluster)
@@ -65,6 +68,7 @@ TestWorkload::TestWorkload(MiniCluster* cluster)
     // high-stress workloads.
     read_timeout_millis_(60000),
     write_batch_size_(50),
+    write_interval_millis_(0),
     write_timeout_millis_(20000),
     timeout_allowed_(false),
     not_found_allowed_(false),
@@ -77,6 +81,7 @@ TestWorkload::TestWorkload(MiniCluster* cluster)
     start_latch_(0),
     should_run_(false),
     rows_inserted_(0),
+    rows_deleted_(0),
     batches_completed_(0),
     sequential_key_gen_(0) {
   // Make the default write pattern random row inserts.
@@ -87,11 +92,11 @@ TestWorkload::~TestWorkload() {
   StopAndJoin();
 }
 
-void TestWorkload::set_schema(const client::KuduSchema& schema) {
+void TestWorkload::set_schema(const KuduSchema& schema) {
   // Do some sanity checks on the schema. They reflect how the rest of
   // TestWorkload is going to use the schema.
   CHECK_GT(schema.num_columns(), 0) << "Schema should have at least one column";
-  std::vector<int> key_indexes;
+  vector<int> key_indexes;
   schema.GetPrimaryKeyColumnIndexes(&key_indexes);
   CHECK_LE(1, key_indexes.size()) << "Schema should have at least one key column";
   CHECK_EQ(0, key_indexes[0]) << "Schema's first key column should be index 0";
@@ -137,73 +142,69 @@ void TestWorkload::WriteThread() {
 
   while (should_run_.Load()) {
     int inserted = 0;
-    for (int i = 0; i < write_batch_size_; i++) {
-      if (write_pattern_ == UPDATE_ONE_ROW) {
-        gscoped_ptr<KuduUpdate> update(table->NewUpdate());
-        KuduPartialRow* row = update->mutable_row();
-        tools::GenerateDataForRow(schema_, 0, &rng_, row);
-        CHECK_OK(session->Apply(update.release()));
-      } else {
-        gscoped_ptr<KuduInsert> insert(table->NewInsert());
-        KuduPartialRow* row = insert->mutable_row();
-        int32_t key;
-        if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) {
-          key = sequential_key_gen_.Increment();
+    int deleted = 0;
+    vector<int32_t> keys;
+    // Write insert or update row to cluster.
+    {
+      for (int i = 0; i < write_batch_size_; i++) {
+        if (write_pattern_ == UPDATE_ONE_ROW) {
+          gscoped_ptr<KuduUpdate> update(table->NewUpdate());
+          KuduPartialRow* row = update->mutable_row();
+          tools::GenerateDataForRow(schema_, 0, &rng_, row);
+          CHECK_OK(session->Apply(update.release()));
         } else {
-          key = rng_.Next();
-          if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
-            key %= kNumRowsForDuplicateKeyWorkload;
+          gscoped_ptr<KuduInsert> insert(table->NewInsert());
+          KuduPartialRow* row = insert->mutable_row();
+          int32_t key;
+          if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) {
+            key = sequential_key_gen_.Increment();
+          } else {
+            key = rng_.Next();
+            if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
+              key %= kNumRowsForDuplicateKeyWorkload;
+            }
           }
-        }
-
-        tools::GenerateDataForRow(schema_, key, &rng_, row);
-        if (payload_bytes_) {
-          // Note: overriding payload_bytes_ requires the "simple" schema.
-          std::string test_payload(payload_bytes_.get(), '0');
-          CHECK_OK(row->SetStringCopy(2, test_payload));
-        }
-        CHECK_OK(session->Apply(insert.release()));
-        inserted++;
-      }
-    }
-
-    Status s = session->Flush();
-
-    if (PREDICT_FALSE(!s.ok())) {
-      std::vector<client::KuduError*> errors;
-      ElementDeleter d(&errors);
-      bool overflow;
-      session->GetPendingErrors(&errors, &overflow);
-      CHECK(!overflow);
-      for (const client::KuduError* e : errors) {
-        if (timeout_allowed_ && e->status().IsTimedOut()) {
-          continue;
-        }
-
-        if (not_found_allowed_ && e->status().IsNotFound()) {
-          continue;
-        }
+          keys.push_back(key);
 
-        if (already_present_allowed_ && e->status().IsAlreadyPresent()) {
-          continue;
-        }
-
-        if (network_error_allowed_ && e->status().IsNetworkError()) {
-          continue;
-        }
+          tools::GenerateDataForRow(schema_, key, &rng_, row);
+          if (payload_bytes_) {
+            // Note: overriding payload_bytes_ requires the "simple" schema.
+            std::string test_payload(payload_bytes_.get(), '0');
+            CHECK_OK(row->SetStringCopy(2, test_payload));
+          }
+          CHECK_OK(session->Apply(insert.release()));
 
-        if (remote_error_allowed_ && e->status().IsRemoteError()) {
-          continue;
+          inserted++;
         }
-
-        LOG(FATAL) << e->status().ToString();
       }
-      inserted -= errors.size();
+      Status s = session->Flush();
+      if (PREDICT_FALSE(!s.ok())) {
+        inserted -= GetNumberOfErrors(session.get());
+      }
+      if (inserted > 0) {
+        rows_inserted_.IncrementBy(inserted);
+        batches_completed_.Increment();
+      }
     }
-
-    if (inserted > 0) {
-      rows_inserted_.IncrementBy(inserted);
-      batches_completed_.Increment();
+    if (PREDICT_FALSE(write_interval_millis_ > 0)) {
+      SleepFor(MonoDelta::FromMilliseconds(write_interval_millis_));
+    }
+    // Write delete row to cluster.
+    if (write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE) {
+      for (auto key : keys) {
+        gscoped_ptr<KuduDelete> op(table->NewDelete());
+        KuduPartialRow* row = op->mutable_row();
+        tools::WriteValueToColumn(schema_, 0, key, row);
+        CHECK_OK(session->Apply(op.release()));
+        deleted++;
+      }
+      Status s = session->Flush();
+      if (PREDICT_FALSE(!s.ok())) {
+        deleted -= GetNumberOfErrors(session.get());
+      }
+      if (deleted > 0) {
+        rows_deleted_.IncrementBy(deleted);
+      }
     }
   }
 }
@@ -220,7 +221,10 @@ void TestWorkload::ReadThread() {
     CHECK_OK(scanner.SetTimeoutMillis(read_timeout_millis_));
     CHECK_OK(scanner.SetFaultTolerant());
 
-    int64_t expected_row_count = rows_inserted_.Load();
+    // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify
+    // anything except that a scan works.
+    int64_t expected_row_count = write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE ?
+                                 0 : rows_inserted_.Load();
     size_t row_count = 0;
 
     CHECK_OK(scanner.Open());
@@ -234,6 +238,25 @@ void TestWorkload::ReadThread() {
   }
 }
 
+size_t TestWorkload::GetNumberOfErrors(KuduSession* session) {
+  vector<KuduError*> errors;
+  ElementDeleter d(&errors);
+  bool overflow;
+  session->GetPendingErrors(&errors, &overflow);
+  CHECK(!overflow);
+  for (const KuduError* e : errors) {
+    if ((timeout_allowed_ && e->status().IsTimedOut()) ||
+        (not_found_allowed_ && e->status().IsNotFound()) ||
+        (already_present_allowed_ && e->status().IsAlreadyPresent()) ||
+        (network_error_allowed_ && e->status().IsNetworkError()) ||
+        (remote_error_allowed_ && e->status().IsRemoteError())) {
+      continue;
+    }
+    LOG(FATAL) << e->status().ToString();
+  }
+  return errors.size();
+}
+
 shared_ptr<KuduClient> TestWorkload::CreateClient() {
   CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
   return client_;
@@ -259,7 +282,7 @@ void TestWorkload::Setup() {
 
   if (!table_exists) {
     // Create split rows.
-    std::vector<const KuduPartialRow*> splits;
+    vector<const KuduPartialRow*> splits;
     for (int i = 1; i < num_tablets_; i++) {
       KuduPartialRow* r = schema_.NewRow();
       CHECK_OK(r->SetInt32("key", MathLimits<int32_t>::kMax / num_tablets_ * i));
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 689f139..00a35af 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_INTEGRATION_TESTS_TEST_WORKLOAD_H
 #define KUDU_INTEGRATION_TESTS_TEST_WORKLOAD_H
 
+#include <cstddef>
 #include <cstdint>
 #include <ostream>
 #include <string>
@@ -76,6 +77,10 @@ class TestWorkload {
     write_batch_size_ = s;
   }
 
+  void set_write_interval_millis(int t) {
+    write_interval_millis_ = t;
+  }
+
   void set_client_default_rpc_timeout_millis(int t) {
     client_builder_.default_rpc_timeout(MonoDelta::FromMilliseconds(t));
   }
@@ -149,6 +154,11 @@ class TestWorkload {
     // duplicate, but with 32-bit keys, they won't be frequent.
     INSERT_RANDOM_ROWS,
 
+    // Insert random rows, then delete them.
+    // This may cause an occasional duplicate, but with 32-bit keys, they won't be frequent.
+    // This requires two flush operations.
+    INSERT_RANDOM_ROWS_WITH_DELETE,
+
     // All threads generate updates against a single row.
     UPDATE_ONE_ROW,
 
@@ -170,6 +180,7 @@ class TestWorkload {
         set_already_present_allowed(true);
         break;
       case INSERT_RANDOM_ROWS:
+      case INSERT_RANDOM_ROWS_WITH_DELETE:
       case UPDATE_ONE_ROW:
       case INSERT_SEQUENTIAL_ROWS:
         set_already_present_allowed(false);
@@ -199,6 +210,12 @@ class TestWorkload {
     return rows_inserted_.Load();
   }
 
+  // Return the number of rows deleted so far. This may be called either
+  // during or after the write workload.
+  int64_t rows_deleted() const {
+    return rows_deleted_.Load();
+  }
+
   // Return the number of batches in which we have successfully inserted at
   // least one row.
   // NOTE: it is not safe to assume that this is exactly equal to the number
@@ -214,6 +231,7 @@ class TestWorkload {
   void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
   void WriteThread();
   void ReadThread();
+  size_t GetNumberOfErrors(client::KuduSession* session);
 
   cluster::MiniCluster* cluster_;
   client::KuduClientBuilder client_builder_;
@@ -225,6 +243,7 @@ class TestWorkload {
   int num_read_threads_;
   int read_timeout_millis_;
   int write_batch_size_;
+  int write_interval_millis_;
   int write_timeout_millis_;
   bool timeout_allowed_;
   bool not_found_allowed_;
@@ -241,6 +260,7 @@ class TestWorkload {
   CountDownLatch start_latch_;
   AtomicBool should_run_;
   AtomicInt<int64_t> rows_inserted_;
+  AtomicInt<int64_t> rows_deleted_;
   AtomicInt<int64_t> batches_completed_;
   AtomicInt<int32_t> sequential_key_gen_;
 
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 4ea8684..770e3aa 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -744,10 +744,16 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
                                             dfr));
   VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read";
 
-  // Merge the deleted row count of the old DMS to the RowSetMetadata if necessary.
-  rowset_metadata_->IncrementLiveRows(-deleted_row_count_);
-
-  RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id));
+  {
+    // Merge the deleted row count of the old DMS to the RowSetMetadata
+    // and reset deleted_row_count_ should be atomic, so we lock the
+    // component_lock_ in exclusive mode.
+    std::lock_guard<rw_spinlock> lock(component_lock_);
+    RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(),
+                                                             deleted_row_count_,
+                                                             block_id));
+    deleted_row_count_ = 0;
+  }
   if (flush_type == FLUSH_METADATA) {
     RETURN_NOT_OK_PREPEND(rowset_metadata_->Flush(),
                           Substitute("Unable to commit Delta block metadata for: $0",
@@ -814,7 +820,6 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
     CHECK_EQ(redo_delta_stores_[idx], old_dms)
         << "Another thread modified the delta store list during flush";
     redo_delta_stores_[idx] = dfr;
-    deleted_row_count_ = 0;
   }
 
   return Status::OK();
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 072b6e6..a4ea0a1 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -373,7 +373,7 @@ class DeltaTracker {
   // TODO(perf): this needs to be more fine grained
   mutable Mutex compact_flush_lock_;
 
-  // Number of deleted rows for a DMS that is currently being flushed.
+  // Number of deleted rows for a DMS that is currently being flushed.
   // When the flush completes, this is merged into the RowSetMetadata
   // and reset.
   int64_t deleted_row_count_;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 8143fc3..b2395a9 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -467,7 +467,7 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     s = cur_redo_writer_->FinishAndReleaseBlock(block_transaction_.get());
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
-      cur_drs_metadata_->CommitRedoDeltaDataBlock(0, cur_redo_ds_block_id_);
+      cur_drs_metadata_->CommitRedoDeltaDataBlock(0, 0, cur_redo_ds_block_id_);
     } else {
       DCHECK_EQ(cur_redo_delta_stats->min_timestamp(), Timestamp::kMax);
     }
diff --git a/src/kudu/tablet/metadata-test.cc b/src/kudu/tablet/metadata-test.cc
index ec77b00..e67529d 100644
--- a/src/kudu/tablet/metadata-test.cc
+++ b/src/kudu/tablet/metadata-test.cc
@@ -46,7 +46,7 @@ class MetadataTest : public KuduTest {
     tablet_meta_ = new TabletMetadata(nullptr, "fake-tablet");
     CHECK_OK(RowSetMetadata::CreateNew(tablet_meta_.get(), 0, &meta_));
     for (int i = 0; i < all_blocks_.size(); i++) {
-      CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, all_blocks_[i]));
+      CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i]));
     }
     CHECK_EQ(4, meta_->redo_delta_blocks().size());
   }
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 2e3afaa..dab3913 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/local_tablet_writer.h"
@@ -408,13 +409,18 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
       "num_rowsets");
     shared_ptr<TimeSeries> memrowset_size_ts = ts_collector_.GetTimeSeries(
       "memrowset_kb");
+    shared_ptr<TimeSeries> num_live_rows_ts = ts_collector_.GetTimeSeries(
+      "num_live_rows");
 
     while (running_insert_count_.count() > 0) {
       num_rowsets_ts->SetValue(tablet()->num_rowsets());
       memrowset_size_ts->SetValue(tablet()->MemRowSetSize() / 1024.0);
+      int64_t num_live_rows;
+      ignore_result(tablet()->CountLiveRows(&num_live_rows));
+      num_live_rows_ts->SetValue(num_live_rows);
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(250));
+      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(10));
     }
   }
 
@@ -491,6 +497,7 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
   FLAGS_flusher_initial_frequency_ms = 1;
   FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01f;
   FLAGS_tablet_delta_store_minor_compact_max = 10;
+  this->StartThreads(1, &TestFixture::CollectStatisticsThread);
   this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread);
   this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread);
   this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread);
diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc
index 6bd19ce..6c9705c 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -187,10 +187,12 @@ void RowSetMetadata::SetColumnDataBlocks(const std::map<ColumnId, BlockId>& bloc
 }
 
 Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id,
+                                                int64_t num_deleted_rows,
                                                 const BlockId& block_id) {
   std::lock_guard<LockType> l(lock_);
   last_durable_redo_dms_id_ = dms_id;
   redo_delta_blocks_.push_back(block_id);
+  IncrementLiveRowsUnlocked(-num_deleted_rows);
   return Status::OK();
 }
 
@@ -278,14 +280,18 @@ void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
   blocks_by_col_id_.shrink_to_fit();
 }
 
-void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
+void RowSetMetadata::IncrementLiveRowsUnlocked(int64_t row_count) {
   if (tablet_metadata_->supports_live_row_count() && row_count != 0) {
-    std::lock_guard<LockType> l(lock_);
     live_row_count_ += row_count;
     DCHECK_GE(live_row_count_, 0);
   }
 }
 
+void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
+  std::lock_guard<LockType> l(lock_);
+  IncrementLiveRowsUnlocked(row_count);
+}
+
 int64_t RowSetMetadata::live_row_count() const {
   std::lock_guard<LockType> l(lock_);
   DCHECK_GE(live_row_count_, 0);
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index 94af6f3..7f67b63 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -117,7 +117,11 @@ class RowSetMetadata {
 
   void SetColumnDataBlocks(const std::map<ColumnId, BlockId>& blocks_by_col_id);
 
-  Status CommitRedoDeltaDataBlock(int64_t dms_id, const BlockId& block_id);
+  // Atomically commit the new redo delta block to RowSetMetadata.
+  // This atomic operation includes updates to last_durable_redo_dms_id_ and live_row_count_.
+  Status CommitRedoDeltaDataBlock(int64_t dms_id,
+                                  int64_t num_deleted_rows,
+                                  const BlockId& block_id);
 
   Status CommitUndoDeltaDataBlock(const BlockId& block_id);
 
@@ -256,6 +260,8 @@ class RowSetMetadata {
 
   Status InitFromPB(const RowSetDataPB& pb);
 
+  void IncrementLiveRowsUnlocked(int64_t row_count);
+
   TabletMetadata* const tablet_metadata_;
   bool initted_;
   int64_t id_;