You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/07/18 23:13:05 UTC

[1/3] incubator-kudu git commit: rw_mutex: prevent recursive use

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 7e3794c47 -> 2521554b3


rw_mutex: prevent recursive use

Todd provided an example[1] of deadlocked rwlocks due to a fairness policy.
In the example, T1 (holding the lock for reading) join()ed on T2 (trying to
acquire the lock for reading) all while T3 was trying to acquire the lock
for writing. The lock's fairness policy prevented T2 from acquiring the read
lock thus deadlocking all three threads. The takeaway is to be careful when
calling join() while holding locks.

Beyond that, deadlocks can arise if rwlocks are taken recursively. That's
not a feature we need in our rwlocks, so I tried to disable it at the
pthread level. Unfortunately, the best I can do is disable recursive write
lock acquisition; read locks are apparently always recursive (see "man
pthread_rwlockattr_setkind_np"). So instead, I built recursive checking into
the RWMutex itself. It's quite slow so it's only present in debug builds.

Note that pthread rwlocks do have some built-in deadlock detection (i.e.
lock calls may return EDEADLK), but it doesn't appear to be comprehensive.

1. https://issues.apache.org/jira/browse/HDFS-2223?focusedCommentId=13097647&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13097647

Change-Id: I7ae30ec123a16c39ef0c15ee2d2176f807df03db
Reviewed-on: http://gerrit.cloudera.org:8080/3641
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2a3d898c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2a3d898c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2a3d898c

Branch: refs/heads/master
Commit: 2a3d898cef167cda57570f963a46f626d59651e6
Parents: 7e3794c
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jul 13 13:05:09 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Jul 18 23:00:48 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/CMakeLists.txt   |   1 +
 src/kudu/util/rw_mutex-test.cc | 182 ++++++++++++++++++++++++++++++++++++
 src/kudu/util/rw_mutex.cc      |  90 +++++++++++++++++-
 src/kudu/util/rw_mutex.h       |  45 ++++++++-
 4 files changed, 314 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2a3d898c/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index a924f38..3c514a3 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -317,6 +317,7 @@ ADD_KUDU_TEST(random_util-test)
 ADD_KUDU_TEST(resettable_heartbeater-test)
 ADD_KUDU_TEST(rle-test)
 ADD_KUDU_TEST(rolling_log-test)
+ADD_KUDU_TEST(rw_mutex-test)
 ADD_KUDU_TEST(rw_semaphore-test)
 ADD_KUDU_TEST(rwc_lock-test)
 ADD_KUDU_TEST(safe_math-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2a3d898c/src/kudu/util/rw_mutex-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/rw_mutex-test.cc b/src/kudu/util/rw_mutex-test.cc
new file mode 100644
index 0000000..7f9e376
--- /dev/null
+++ b/src/kudu/util/rw_mutex-test.cc
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/test_util.h"
+
+using std::lock_guard;
+using std::thread;
+using std::try_to_lock;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+class RWMutexTest : public KuduTest,
+                    public ::testing::WithParamInterface<RWMutex::Priority> {
+ public:
+  RWMutexTest()
+     : lock_(GetParam()) {
+  }
+ protected:
+  RWMutex lock_;
+};
+
+// Instantiate every test for each kind of RWMutex priority.
+INSTANTIATE_TEST_CASE_P(Priorities, RWMutexTest,
+                        ::testing::Values(RWMutex::Priority::PREFER_READING,
+                                          RWMutex::Priority::PREFER_WRITING));
+
+// Multi-threaded test that tries to find deadlocks in the RWMutex wrapper.
+TEST_P(RWMutexTest, TestDeadlocks) {
+  uint64_t number_of_writes = 0;
+  AtomicInt<uint64_t> number_of_reads(0);
+
+  AtomicBool done(false);
+  vector<thread> threads;
+
+  // Start several blocking and non-blocking read-write workloads.
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        lock_guard<RWMutex> l(lock_);
+        number_of_writes++;
+      }
+    });
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        unique_lock<RWMutex> l(lock_, try_to_lock);
+        if (l.owns_lock()) {
+          number_of_writes++;
+        }
+      }
+    });
+  }
+
+  // Start several blocking and non-blocking read-only workloads.
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        shared_lock<RWMutex> l(lock_);
+        number_of_reads.Increment();
+      }
+    });
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        shared_lock<RWMutex> l(lock_, try_to_lock);
+        if (l.owns_lock()) {
+          number_of_reads.Increment();
+        }
+      }
+    });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(1));
+  done.Store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  shared_lock<RWMutex> l(lock_);
+  LOG(INFO) << "Number of writes: " << number_of_writes;
+  LOG(INFO) << "Number of reads: " << number_of_reads.Load();
+}
+
+#ifndef NDEBUG
+// Tests that the RWMutex wrapper catches basic usage errors. This checking is
+// only enabled in debug builds.
+TEST_P(RWMutexTest, TestLockChecking) {
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.ReadLock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    CHECK(lock_.TryReadLock());
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.WriteLock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    CHECK(lock_.TryWriteLock());
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.ReadLock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    CHECK(lock_.TryReadLock());
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.WriteLock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    CHECK(lock_.TryWriteLock());
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.ReadUnlock();
+  }, "wasn't holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteUnlock();
+  }, "wasn't holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.WriteUnlock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    lock_.WriteUnlock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.ReadUnlock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    lock_.ReadUnlock();
+  }, "already holding lock for writing");
+}
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2a3d898c/src/kudu/util/rw_mutex.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/rw_mutex.cc b/src/kudu/util/rw_mutex.cc
index d9473bb..6dd8f3e 100644
--- a/src/kudu/util/rw_mutex.cc
+++ b/src/kudu/util/rw_mutex.cc
@@ -18,6 +18,12 @@
 #include "kudu/util/rw_mutex.h"
 
 #include <glog/logging.h>
+#include <mutex>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/env.h"
+
+using std::lock_guard;
 
 namespace {
 
@@ -30,11 +36,19 @@ void unlock_rwlock(pthread_rwlock_t* rwlock) {
 
 namespace kudu {
 
-RWMutex::RWMutex() {
+RWMutex::RWMutex()
+#ifndef NDEBUG
+    : writer_tid_(0)
+#endif
+{
   Init(Priority::PREFER_READING);
 }
 
-RWMutex::RWMutex(Priority prio) {
+RWMutex::RWMutex(Priority prio)
+#ifndef NDEBUG
+    : writer_tid_(0)
+#endif
+{
   Init(prio);
 }
 
@@ -73,39 +87,111 @@ RWMutex::~RWMutex() {
 }
 
 void RWMutex::ReadLock() {
+  CheckLockState(LockState::NEITHER);
   int rv = pthread_rwlock_rdlock(&native_handle_);
   DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForReading();
 }
 
 void RWMutex::ReadUnlock() {
+  CheckLockState(LockState::READER);
+  UnmarkForReading();
   unlock_rwlock(&native_handle_);
 }
 
 bool RWMutex::TryReadLock() {
+  CheckLockState(LockState::NEITHER);
   int rv = pthread_rwlock_tryrdlock(&native_handle_);
   if (rv == EBUSY) {
     return false;
   }
   DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForReading();
   return true;
 }
 
 void RWMutex::WriteLock() {
+  CheckLockState(LockState::NEITHER);
   int rv = pthread_rwlock_wrlock(&native_handle_);
   DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForWriting();
 }
 
 void RWMutex::WriteUnlock() {
+  CheckLockState(LockState::WRITER);
+  UnmarkForWriting();
   unlock_rwlock(&native_handle_);
 }
 
 bool RWMutex::TryWriteLock() {
+  CheckLockState(LockState::NEITHER);
   int rv = pthread_rwlock_trywrlock(&native_handle_);
   if (rv == EBUSY) {
     return false;
   }
   DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForWriting();
   return true;
 }
 
+#ifndef NDEBUG
+
+void RWMutex::AssertAcquiredForReading() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK(ContainsKey(reader_tids_, Env::Default()->gettid()));
+}
+
+void RWMutex::AssertAcquiredForWriting() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK_EQ(Env::Default()->gettid(), writer_tid_);
+}
+
+void RWMutex::CheckLockState(LockState state) const {
+  pid_t my_tid = Env::Default()->gettid();
+  bool is_reader;
+  bool is_writer;
+  {
+    lock_guard<simple_spinlock> l(tid_lock_);
+    is_reader = ContainsKey(reader_tids_, my_tid);
+    is_writer = writer_tid_ == my_tid;
+  }
+
+  switch (state) {
+    case LockState::NEITHER:
+      CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+      CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+      break;
+    case LockState::READER:
+      CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+      CHECK(is_reader) << "Invalid state, wasn't holding lock for reading";
+      break;
+    case LockState::WRITER:
+      CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+      CHECK(is_writer) << "Invalid state, wasn't holding lock for writing";
+      break;
+  }
+}
+
+void RWMutex::MarkForReading() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  reader_tids_.insert(Env::Default()->gettid());
+}
+
+void RWMutex::MarkForWriting() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  writer_tid_ = Env::Default()->gettid();
+}
+
+void RWMutex::UnmarkForReading() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  reader_tids_.erase(Env::Default()->gettid());
+}
+
+void RWMutex::UnmarkForWriting() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  writer_tid_ = 0;
+}
+
+#endif
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2a3d898c/src/kudu/util/rw_mutex.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/rw_mutex.h b/src/kudu/util/rw_mutex.h
index 64a88d1..969f2be 100644
--- a/src/kudu/util/rw_mutex.h
+++ b/src/kudu/util/rw_mutex.h
@@ -18,14 +18,17 @@
 #pragma once
 
 #include <pthread.h>
+#include <unordered_set>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
 
 namespace kudu {
 
-// Read/write mutex.
+// Read/write mutex. Implemented as a thin wrapper around pthread_rwlock_t.
 //
-// Implemented as a thin wrapper around pthread_rwlock_t.
+// Although pthread_rwlock_t allows recursive acquisition, this wrapper does
+// not, and will crash in debug mode if recursive acquisition is detected.
 class RWMutex {
  public:
 
@@ -59,6 +62,14 @@ class RWMutex {
   void WriteUnlock();
   bool TryWriteLock();
 
+#ifndef NDEBUG
+  void AssertAcquiredForReading() const;
+  void AssertAcquiredForWriting() const;
+#else
+  void AssertAcquiredForReading() const {}
+  void AssertAcquiredForWriting() const {}
+#endif
+
   // Aliases for use with std::lock_guard and kudu::shared_lock.
   void lock() { WriteLock(); }
   void unlock() { WriteUnlock(); }
@@ -70,8 +81,38 @@ class RWMutex {
  private:
   void Init(Priority prio);
 
+  enum class LockState {
+    NEITHER,
+    READER,
+    WRITER,
+  };
+#ifndef NDEBUG
+  void CheckLockState(LockState state) const;
+  void MarkForReading();
+  void MarkForWriting();
+  void UnmarkForReading();
+  void UnmarkForWriting();
+#else
+  void CheckLockState(LockState state) const {}
+  void MarkForReading() {}
+  void MarkForWriting() {}
+  void UnmarkForReading() {}
+  void UnmarkForWriting() {}
+#endif
+
   pthread_rwlock_t native_handle_;
 
+#ifndef NDEBUG
+  // Protects reader_tids_ and writer_tid_.
+  mutable simple_spinlock tid_lock_;
+
+  // Tracks all current readers by tid.
+  std::unordered_set<pid_t> reader_tids_;
+
+  // Tracks the current writer (if one exists) by tid.
+  pid_t writer_tid_;
+#endif
+
   DISALLOW_COPY_AND_ASSIGN(RWMutex);
 };
 


[2/3] incubator-kudu git commit: master: add assert checks for leader_lock

Posted by to...@apache.org.
master: add assert checks for leader_lock

A side effect of recursive checking in RWMutex is that we can now assert
that a RWMutex is held for reading/writing. Let's add that to the various
catalog manager entry points.

Change-Id: Iefb5762c70192b27490cc71e20568815d18d6ad5
Reviewed-on: http://gerrit.cloudera.org:8080/3642
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/0a106063
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/0a106063
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/0a106063

Branch: refs/heads/master
Commit: 0a10606379b08b731ef680afd2291e44c671315a
Parents: 2a3d898
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jul 13 15:23:37 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Jul 18 23:01:00 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc                  |  7 ++++--
 src/kudu/integration-tests/alter_table-test.cc  | 12 ++++++----
 .../create-table-stress-test.cc                 | 17 +++++++++-----
 src/kudu/integration-tests/mini_cluster.cc      | 17 +++++++-------
 src/kudu/integration-tests/mini_cluster.h       |  7 ------
 src/kudu/master/catalog_manager.cc              | 24 +++++++++++++-------
 src/kudu/master/catalog_manager.h               |  9 ++++++++
 src/kudu/master/master-test-util.h              | 24 ++++++++++++++++----
 src/kudu/master/master.cc                       |  2 +-
 9 files changed, 78 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 9d26b8b..404c7b7 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -173,8 +173,11 @@ class ClientTest : public KuduTest {
     GetTableLocationsRequestPB req;
     GetTableLocationsResponsePB resp;
     req.mutable_table()->set_table_name(table->name());
-    CHECK_OK(cluster_->mini_master()->master()->catalog_manager()->GetTableLocations(
-        &req, &resp));
+    CatalogManager* catalog =
+        cluster_->mini_master()->master()->catalog_manager();
+    CatalogManager::ScopedLeaderSharedLock l(catalog);
+    CHECK_OK(l.first_failed_status());
+    CHECK_OK(catalog->GetTableLocations(&req, &resp));
     CHECK(resp.tablet_locations_size() > 0);
     return resp.tablet_locations(0).tablet_id();
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index c876f7e..8c97a39 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -294,15 +294,19 @@ TEST_F(AlterTableTest, TestAddNotNullableColumnWithoutDefaults) {
 
   {
     AlterTableRequestPB req;
-    req.mutable_table()->set_table_name(kTableName);
+    AlterTableResponsePB resp;
 
+    req.mutable_table()->set_table_name(kTableName);
     AlterTableRequestPB::Step *step = req.add_alter_schema_steps();
     step->set_type(AlterTableRequestPB::ADD_COLUMN);
     ColumnSchemaToPB(ColumnSchema("c2", INT32),
                      step->mutable_add_column()->mutable_schema());
-    AlterTableResponsePB resp;
-    Status s = cluster_->mini_master()->master()->catalog_manager()->AlterTable(
-      &req, &resp, nullptr);
+
+    master::CatalogManager* catalog =
+        cluster_->mini_master()->master()->catalog_manager();
+    master::CatalogManager::ScopedLeaderSharedLock l(catalog);
+    ASSERT_OK(l.first_failed_status());
+    Status s = catalog->AlterTable(&req, &resp, nullptr);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "column `c2`: NOT NULL columns must have a default");
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/integration-tests/create-table-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc
index c740e58..c4c63f4 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -222,6 +222,11 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
                                        FLAGS_num_test_tablets, &resp));
   }
 
+  master::CatalogManager* catalog =
+      cluster_->mini_master()->master()->catalog_manager();
+  master::CatalogManager::ScopedLeaderSharedLock l(catalog);
+  ASSERT_OK(l.first_failed_status());
+
   // Test asking for 0 tablets, should fail
   LOG(INFO) << CURRENT_TEST_NAME() << ": Step 3. Asking for zero tablets...";
   LOG_TIMING(INFO, "asking for zero tablets") {
@@ -229,7 +234,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(0);
-    Status s = cluster_->mini_master()->master()->catalog_manager()->GetTableLocations(&req, &resp);
+    Status s = catalog->GetTableLocations(&req, &resp);
     ASSERT_STR_CONTAINS(s.ToString(), "must be greater than 0");
   }
 
@@ -240,7 +245,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(1);
-    ASSERT_OK(cluster_->mini_master()->master()->catalog_manager()->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
     ASSERT_EQ(resp.tablet_locations_size(), 1);
     // empty since it's the first
     ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_start(), "");
@@ -255,7 +260,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(half_tablets);
-    ASSERT_OK(cluster_->mini_master()->master()->catalog_manager()->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
     ASSERT_EQ(half_tablets, resp.tablet_locations_size());
   }
 
@@ -266,7 +271,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(FLAGS_num_test_tablets);
-    ASSERT_OK(cluster_->mini_master()->master()->catalog_manager()->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
     ASSERT_EQ(FLAGS_num_test_tablets, resp.tablet_locations_size());
   }
 
@@ -274,7 +279,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
   LOG(INFO) << "Tables and tablets:";
   LOG(INFO) << "========================================================";
   std::vector<scoped_refptr<master::TableInfo> > tables;
-  cluster_->mini_master()->master()->catalog_manager()->GetAllTables(&tables);
+  catalog->GetAllTables(&tables);
   for (const scoped_refptr<master::TableInfo>& table_info : tables) {
     LOG(INFO) << "Table: " << table_info->ToString();
     std::vector<scoped_refptr<master::TabletInfo> > tablets;
@@ -310,7 +315,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(1);
     req.set_partition_key_start(start_key_middle);
-    ASSERT_OK(cluster_->mini_master()->master()->catalog_manager()->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
     ASSERT_EQ(1, resp.tablet_locations_size()) << "Response: [" << resp.DebugString() << "]";
     ASSERT_EQ(start_key_middle, resp.tablet_locations(0).partition().partition_key_start());
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/integration-tests/mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.cc b/src/kudu/integration-tests/mini_cluster.cc
index 7133b8e..2b5ef67 100644
--- a/src/kudu/integration-tests/mini_cluster.cc
+++ b/src/kudu/integration-tests/mini_cluster.cc
@@ -190,7 +190,7 @@ MiniMaster* MiniCluster::leader_mini_master() {
       }
       CatalogManager::ScopedLeaderSharedLock l(
           master->master()->catalog_manager());
-      if (l.catalog_status().ok() && l.leader_status().ok()) {
+      if (l.first_failed_status().ok()) {
         return master;
       }
     }
@@ -241,19 +241,18 @@ string MiniCluster::GetTabletServerFsRoot(int idx) {
 }
 
 Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
-                                        int expected_count) {
-  TabletLocationsPB locations;
-  return WaitForReplicaCount(tablet_id, expected_count, &locations);
-}
-
-Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
                                         int expected_count,
                                         TabletLocationsPB* locations) {
   Stopwatch sw;
   sw.start();
   while (sw.elapsed().wall_seconds() < kTabletReportWaitTimeSeconds) {
-    Status s =
-        leader_mini_master()->master()->catalog_manager()->GetTabletLocations(tablet_id, locations);
+    CatalogManager* catalog = leader_mini_master()->master()->catalog_manager();
+    Status s;
+    {
+      CatalogManager::ScopedLeaderSharedLock l(catalog);
+      RETURN_NOT_OK(l.first_failed_status());
+      s = catalog->GetTabletLocations(tablet_id, locations);
+    }
     if (s.ok() && locations->replicas_size() == expected_count) {
       return Status::OK();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/integration-tests/mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.h b/src/kudu/integration-tests/mini_cluster.h
index 40e4af2..e682316 100644
--- a/src/kudu/integration-tests/mini_cluster.h
+++ b/src/kudu/integration-tests/mini_cluster.h
@@ -130,13 +130,6 @@ class MiniCluster {
   std::string GetTabletServerFsRoot(int idx);
 
   // Wait for the given tablet to have 'expected_count' replicas
-  // reported on the master.
-  // Requires that the master has started.
-  // Returns a bad Status if the tablet does not reach the required count
-  // within kTabletReportWaitTimeSeconds.
-  Status WaitForReplicaCount(const std::string& tablet_id, int expected_count);
-
-  // Wait for the given tablet to have 'expected_count' replicas
   // reported on the master. Returns the locations in '*locations'.
   // Requires that the master has started;
   // Returns a bad Status if the tablet does not reach the required count

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 88bd3e5..9271436 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -754,6 +754,7 @@ Status CatalogManager::CheckOnline() const {
 Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
                                    CreateTableResponsePB* resp,
                                    rpc::RpcContext* rpc) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
   Status s;
 
@@ -954,6 +955,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
 
 Status CatalogManager::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
                                          IsCreateTableDoneResponsePB* resp) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
   scoped_refptr<TableInfo> table;
@@ -1025,11 +1027,12 @@ Status CatalogManager::FindTable(const TableIdentifierPB& table_identifier,
 Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
                                    DeleteTableResponsePB* resp,
                                    rpc::RpcContext* rpc) {
+  leader_lock_.AssertAcquiredForReading();
+  RETURN_NOT_OK(CheckOnline());
+
   LOG(INFO) << "Servicing DeleteTable request from " << RequestorString(rpc)
             << ": " << req->ShortDebugString();
 
-  RETURN_NOT_OK(CheckOnline());
-
   // 1. Look up the table, lock it, and mark it as removed.
   TRACE("Looking up table");
   scoped_refptr<TableInfo> table;
@@ -1201,11 +1204,12 @@ static Status ApplyAlterSteps(const SysTablesEntryPB& current_pb,
 Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                                   AlterTableResponsePB* resp,
                                   rpc::RpcContext* rpc) {
+  leader_lock_.AssertAcquiredForReading();
+  RETURN_NOT_OK(CheckOnline());
+
   LOG(INFO) << "Servicing AlterTable request from " << RequestorString(rpc)
             << ": " << req->ShortDebugString();
 
-  RETURN_NOT_OK(CheckOnline());
-
   // 1. Lookup the table and verify if it exists.
   TRACE("Looking up table");
   scoped_refptr<TableInfo> table;
@@ -1345,6 +1349,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
 Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
                                         IsAlterTableDoneResponsePB* resp,
                                         rpc::RpcContext* rpc) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
   scoped_refptr<TableInfo> table;
@@ -1372,6 +1377,7 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
 
 Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
                                       GetTableSchemaResponsePB* resp) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
   scoped_refptr<TableInfo> table;
@@ -1409,6 +1415,7 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
 
 Status CatalogManager::ListTables(const ListTablesRequestPB* req,
                                   ListTablesResponsePB* resp) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
   shared_lock<LockType> l(lock_);
@@ -1466,6 +1473,8 @@ Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
                "requestor", rpc->requestor_string(),
                "num_tablets", report.updated_tablets_size());
 
+  leader_lock_.AssertAcquiredForReading();
+
   if (VLOG_IS_ON(2)) {
     VLOG(2) << "Received tablet report from " <<
       RequestorString(rpc) << ": " << report.DebugString();
@@ -3018,6 +3027,7 @@ Status CatalogManager::BuildLocationsForTablet(const scoped_refptr<TabletInfo>&
 
 Status CatalogManager::GetTabletLocations(const std::string& tablet_id,
                                           TabletLocationsPB* locs_pb) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
   locs_pb->mutable_replicas()->Clear();
@@ -3034,6 +3044,7 @@ Status CatalogManager::GetTabletLocations(const std::string& tablet_id,
 
 Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
                                          GetTableLocationsResponsePB* resp) {
+  leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
   // If start-key is > end-key report an error instead of swap the two
@@ -3214,10 +3225,7 @@ bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedOrRespond(
 template<typename RespClass>
 bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond(
     RespClass* resp, RpcContext* rpc) {
-  Status& s = catalog_status_;
-  if (PREDICT_TRUE(s.ok())) {
-    s = leader_status_;
-  }
+  const Status& s = first_failed_status();
   if (PREDICT_TRUE(s.ok())) {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 44f84c0..0196d8d 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -313,6 +313,15 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
       return leader_status_;
     }
 
+    // First non-OK status of the catalog manager, adhering to the checking
+    // order specified above.
+    const Status& first_failed_status() const {
+      if (!catalog_status_.ok()) {
+        return catalog_status_;
+      }
+      return leader_status_;
+    }
+
     // Check that the catalog manager is initialized. It may or may not be the
     // leader of its Raft configuration.
     //

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/master/master-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test-util.h b/src/kudu/master/master-test-util.h
index da5861b..dc8b0eb 100644
--- a/src/kudu/master/master-test-util.h
+++ b/src/kudu/master/master-test-util.h
@@ -46,7 +46,12 @@ Status WaitForRunningTabletCount(MiniMaster* mini_master,
     resp->Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(expected_count);
-    RETURN_NOT_OK(mini_master->master()->catalog_manager()->GetTableLocations(&req, resp));
+    CatalogManager* catalog = mini_master->master()->catalog_manager();
+    {
+      CatalogManager::ScopedLeaderSharedLock l(catalog);
+      RETURN_NOT_OK(l.first_failed_status());
+      RETURN_NOT_OK(catalog->GetTableLocations(&req, resp));
+    }
     if (resp->tablet_locations_size() >= expected_count) {
       return Status::OK();
     }
@@ -74,7 +79,10 @@ void CreateTabletForTesting(MiniMaster* mini_master,
     req.set_name(table_name);
     req.set_num_replicas(1);
     ASSERT_OK(SchemaToPB(schema, req.mutable_schema()));
-    ASSERT_OK(mini_master->master()->catalog_manager()->CreateTable(&req, &resp, NULL));
+    CatalogManager* catalog = mini_master->master()->catalog_manager();
+    CatalogManager::ScopedLeaderSharedLock l(catalog);
+    ASSERT_OK(l.first_failed_status());
+    ASSERT_OK(catalog->CreateTable(&req, &resp, NULL));
   }
 
   int wait_time = 1000;
@@ -84,7 +92,12 @@ void CreateTabletForTesting(MiniMaster* mini_master,
     IsCreateTableDoneResponsePB resp;
 
     req.mutable_table()->set_table_name(table_name);
-    ASSERT_OK(mini_master->master()->catalog_manager()->IsCreateTableDone(&req, &resp));
+    CatalogManager* catalog = mini_master->master()->catalog_manager();
+    {
+      CatalogManager::ScopedLeaderSharedLock l(catalog);
+      ASSERT_OK(l.first_failed_status());
+      ASSERT_OK(catalog->IsCreateTableDone(&req, &resp));
+    }
     if (resp.done()) {
       is_table_created = true;
       break;
@@ -101,7 +114,10 @@ void CreateTabletForTesting(MiniMaster* mini_master,
     GetTableSchemaRequestPB req;
     GetTableSchemaResponsePB resp;
     req.mutable_table()->set_table_name(table_name);
-    ASSERT_OK(mini_master->master()->catalog_manager()->GetTableSchema(&req, &resp));
+    CatalogManager* catalog = mini_master->master()->catalog_manager();
+    CatalogManager::ScopedLeaderSharedLock l(catalog);
+    ASSERT_OK(l.first_failed_status());
+    ASSERT_OK(catalog->GetTableSchema(&req, &resp));
     ASSERT_TRUE(resp.create_table_done());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0a106063/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index c42d7f0..0910280 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -161,7 +161,7 @@ Status Master::WaitUntilCatalogManagerIsLeaderAndReadyForTests(const MonoDelta&
   do {
     {
       CatalogManager::ScopedLeaderSharedLock l(catalog_manager_.get());
-      if (l.catalog_status().ok() && l.leader_status().ok()) {
+      if (l.first_failed_status().ok()) {
         return Status::OK();
       }
     }


[3/3] incubator-kudu git commit: Fix flaky disk_reservation-itest

Posted by to...@apache.org.
Fix flaky disk_reservation-itest

There are two fixes in this patch for two separate types of failures
seen on Jenkins for this test:

1. Fix a data race in DiskReservationITest.TestFillMultipleDisks

We can't override gflag strings at runtime in a thread-safe manner,
although this test was attempting to.

Take what used to be a single parsed string gflag and replace it with 2
path strings and 2 integer overrides, one for each path. That makes 4
new test-only gflags total. Only the integer flags are modified at
runtime.

2. Fix a startup race between the TestWorkload client thread and
   SetFlags() in DiskReservationITest.TestWalWriteToFullDiskAborts

We need to wait for some rows to be written after starting up the
TestWorkload threads in TestWalWriteToFullDiskAborts before we allow the
TS to crash by setting gflags. If we don't, the test gets confused
because the TestWorkload client thread may not be able to resolve where
the tablet is located. The previous failures were because we sometimes
managed to crash the TS before it sent its tablet report to the master.

After applying these changes, I looped disk_reservation-itest 1000x in
TSAN mode and got no failures.

Change-Id: Ica86390b49d459e079807d777e97c47fa35134d1
Reviewed-on: http://gerrit.cloudera.org:8080/3652
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2521554b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2521554b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2521554b

Branch: refs/heads/master
Commit: 2521554b3f30b8711519c20292b933f15ce2a529
Parents: 0a10606
Author: Mike Percy <mp...@apache.org>
Authored: Thu Jul 14 01:51:36 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Jul 18 23:12:12 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/disk_reservation-itest.cc | 34 +++++++-----
 src/kudu/util/env_util.cc                       | 56 ++++++++++++++------
 2 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2521554b/src/kudu/integration-tests/disk_reservation-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_reservation-itest.cc b/src/kudu/integration-tests/disk_reservation-itest.cc
index 6ed6f30..41cf999 100644
--- a/src/kudu/integration-tests/disk_reservation-itest.cc
+++ b/src/kudu/integration-tests/disk_reservation-itest.cc
@@ -64,6 +64,11 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
   ts_flags.push_back("--disable_core_dumps");
   ts_flags.push_back(Substitute("--fs_data_dirs=$0/a,$0/b",
                                 GetTestDataDirectory()));
+  ts_flags.push_back(Substitute("--disk_reserved_override_prefix_1_path_for_testing=$0/a",
+                                GetTestDataDirectory()));
+  ts_flags.push_back(Substitute("--disk_reserved_override_prefix_2_path_for_testing=$0/b",
+                                GetTestDataDirectory()));
+
   NO_FATALS(StartCluster(ts_flags, {}, 1));
 
   TestWorkload workload(cluster_.get());
@@ -86,12 +91,13 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
 
   LOG(INFO) << "Two log block containers are active";
 
-  // Simulate that /a has 0 bytes free but /b has 1GB free.
+  // Simulate that /a has 0 bytes free.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_override_prefix_1_bytes_free_for_testing", "0"));
+  // Simulate that /b has 1GB free.
   ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
-                              "disk_reserved_prefixes_with_bytes_free_for_testing",
-                              Substitute("$0/a:0,$0/b:$1",
-                                         GetTestDataDirectory(),
-                                         1L * 1024 * 1024 * 1024)));
+                              "disk_reserved_override_prefix_2_bytes_free_for_testing",
+                              Substitute("$0", 1L * 1024 * 1024 * 1024)));
 
   // Wait until we have 1 unusable container.
   while (true) {
@@ -107,9 +113,7 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
 
   // Now simulate that all disks are full.
   ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
-                              "disk_reserved_prefixes_with_bytes_free_for_testing",
-                              Substitute("$0/a:0,$0/b:0",
-                                         GetTestDataDirectory())));
+                              "disk_reserved_override_prefix_2_bytes_free_for_testing", "0"));
 
   // Wait for crash due to inability to flush or compact.
   ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
@@ -127,14 +131,20 @@ TEST_F(DiskReservationITest, TestWalWriteToFullDiskAborts) {
   TestWorkload workload(cluster_.get());
   workload.set_num_replicas(1);
   workload.set_timeout_allowed(true); // Allow timeouts because we expect the server to crash.
-  workload.set_write_timeout_millis(100); // Keep test time low after crash.
+  workload.set_write_timeout_millis(200); // Keep test time low after crash.
   // Write lots of data to quickly fill up our 1mb log segment size.
-  workload.set_num_write_threads(8);
-  workload.set_write_batch_size(1024);
-  workload.set_payload_bytes(128);
+  workload.set_num_write_threads(4);
+  workload.set_write_batch_size(10);
+  workload.set_payload_bytes(1000);
   workload.Setup();
   workload.Start();
 
+  // Ensure the cluster is running, the client was able to look up the tablet
+  // locations, etc.
+  while (workload.rows_inserted() < 10) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
   // Set the disk to "nearly full" which should eventually cause a crash at WAL
   // preallocation time.
   ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2521554b/src/kudu/util/env_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index e2bb489..5218c33 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -39,11 +39,31 @@ DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
 TAG_FLAG(disk_reserved_bytes_free_for_testing, runtime);
 TAG_FLAG(disk_reserved_bytes_free_for_testing, unsafe);
 
-DEFINE_string(disk_reserved_prefixes_with_bytes_free_for_testing, "",
-             "For testing only! Syntax: '/path/a:5,/path/b:7' means a has 5 bytes free, "
-             "b has 7 bytes free. Set to empty string to disable this test-specific override.");
-TAG_FLAG(disk_reserved_prefixes_with_bytes_free_for_testing, runtime);
-TAG_FLAG(disk_reserved_prefixes_with_bytes_free_for_testing, unsafe);
+// We define some flags for testing purposes: Two prefixes and their associated
+// "bytes free" overrides.
+DEFINE_string(disk_reserved_override_prefix_1_path_for_testing, "",
+              "For testing only! Specifies a prefix to override the visible 'bytes free' on. "
+              "Use --disk_reserved_override_prefix_1_bytes_free_for_testing to set the number of "
+              "bytes free for this path prefix. Set to empty string to disable.");
+DEFINE_int64(disk_reserved_override_prefix_1_bytes_free_for_testing, -1,
+             "For testing only! Set number of bytes free on the path prefix specified by "
+             "--disk_reserved_override_prefix_1_path_for_testing. Set to -1 to disable.");
+DEFINE_string(disk_reserved_override_prefix_2_path_for_testing, "",
+              "For testing only! Specifies a prefix to override the visible 'bytes free' on. "
+              "Use --disk_reserved_override_prefix_2_bytes_free_for_testing to set the number of "
+              "bytes free for this path prefix. Set to empty string to disable.");
+DEFINE_int64(disk_reserved_override_prefix_2_bytes_free_for_testing, -1,
+             "For testing only! Set number of bytes free on the path prefix specified by "
+             "--disk_reserved_override_prefix_2_path_for_testing. Set to -1 to disable.");
+//DEFINE_string(disk_reserved_prefixes_with_bytes_free_for_testing, "",
+//             "For testing only! Syntax: '/path/a:5,/path/b:7' means a has 5 bytes free, "
+//             "b has 7 bytes free. Set to empty string to disable this test-specific override.");
+TAG_FLAG(disk_reserved_override_prefix_1_path_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_2_path_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, runtime);
 
 using std::shared_ptr;
 using strings::Substitute;
@@ -81,16 +101,17 @@ Status OpenFileForSequential(Env *env, const string &path,
   return Status::OK();
 }
 
-// If we can parse the flag value, and the flag specifies an override for the
-// given path, then override the free bytes to match what is specified in the
-// flag. See definition of disk_reserved_prefixes_with_bytes_free_for_testing.
-static void OverrideBytesFree(const string& path, const string& flag, int64_t* bytes_free) {
-  for (const auto& str : strings::Split(flag, ",")) {
-    pair<string, string> p = strings::Split(str, ":");
-    if (HasPrefixString(path, p.first)) {
-      int64_t free_override;
-      if (!safe_strto64(p.second.c_str(), p.second.size(), &free_override)) return;
-      *bytes_free = free_override;
+// If any of the override gflags specifies an override for the given path, then
+// override the free bytes to match what is specified in the flag. See the
+// definitions of these test-only flags for more information.
+static void OverrideBytesFreeWithTestingFlags(const string& path, int64_t* bytes_free) {
+  const string* prefixes[] = { &FLAGS_disk_reserved_override_prefix_1_path_for_testing,
+                               &FLAGS_disk_reserved_override_prefix_2_path_for_testing };
+  const int64_t* overrides[] = { &FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing,
+                                 &FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing };
+  for (int i = 0; i < arraysize(prefixes); i++) {
+    if (*overrides[i] != -1 && !prefixes[i]->empty() && HasPrefixString(path, *prefixes[i])) {
+      *bytes_free = *overrides[i];
       return;
     }
   }
@@ -107,8 +128,9 @@ Status VerifySufficientDiskSpace(Env *env, const std::string& path,
   if (PREDICT_FALSE(FLAGS_disk_reserved_bytes_free_for_testing > -1)) {
     bytes_free = FLAGS_disk_reserved_bytes_free_for_testing;
   }
-  if (PREDICT_FALSE(!FLAGS_disk_reserved_prefixes_with_bytes_free_for_testing.empty())) {
-    OverrideBytesFree(path, FLAGS_disk_reserved_prefixes_with_bytes_free_for_testing, &bytes_free);
+  if (PREDICT_FALSE(FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing != -1 ||
+                    FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing != -1)) {
+    OverrideBytesFreeWithTestingFlags(path, &bytes_free);
   }
 
   if (bytes_free - requested_bytes < reserved_bytes) {