You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/06/24 00:41:16 UTC

[1/2] incubator-kudu git commit: catalog_manager: prevent spurious dirty callbacks from crashing the process

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 630a83bd5 -> 47c023f5e


catalog_manager: prevent spurious dirty callbacks from crashing the process

The recent change to switch from LocalConsensus to RaftConsensus has led to
an increased number of dirty callback calls when using just one master. Some
of these calls occur with no term change; the catalog manager treats them as
any other calls and reloads the on-disk metadata.

In theory that's just unnecessary work, but CheckIsLeaderAndReady() doesn't
provide adequate protection for the rest of the master when in this state, so
nearly every RPC is allowed in during this time. That's an absolute disaster
for correctness: imagine a GetTableLocations() returning only a subset of a
table's tablets because the rest were still being loaded from disk.

Luckily for us it can also manifest as a crash [1] so we noticed it quickly.
I chose to fix it by ignoring these calls within
CatalogManager::VisitTablesAndTabletsTask and not
SysCatalogTable::SysCatalogStateChanged because the synchronization in the
former is more straight forward thanks to the size of worker_pool_. The new
test led to a crash 100% of the time without this fix.

There's an argument to be made for changing TableInfo::TabletMap's raw
pointers to shared pointers thus avoiding this crash altogether, but it's
still an incorrect state to be in, so I don't see the value in doing that.

While I was here, I snuck a few other changes in:
- Remove a lock acquisition from ElectedAsLeaderCb; it did nothing.
- Remove old_role_ from SysCatalogTable; it also did nothing.
- Narrow the lock acquisition in VisitTablesAndTabletsTask; it only needs
  to protect the visiting logic.

1. Sample crash output:

F0618 05:47:41.795367  9330 ref_counted.cc:74] Check failed: !in_dtor_
*** Check failure stack trace: ***
    @     0x7f99fab05f7d  google::LogMessage::Fail() at ??:0
    @     0x7f99fab07e7d  google::LogMessage::SendToLog() at ??:0
    @     0x7f99fab05ab9  google::LogMessage::Flush() at ??:0
    @     0x7f99fab0891f  google::LogMessageFatal::~LogMessageFatal() at ??:0
    @     0x7f99fae8a637  kudu::subtle::RefCountedThreadSafeBase::AddRef() at ??:0
    @     0x7f9a07c486d8  make_scoped_refptr<>() at ??:0
    @     0x7f9a07c2f7f7  kudu::master::TableInfo::GetTabletsInRange() at ??:0
    @     0x7f9a07c2ec35  kudu::master::CatalogManager::GetTableLocations() at ??:0
    @           0x51962e  kudu::master::WaitForRunningTabletCount() at ...
    @           0x51ce59  kudu::CreateTableStressTest_RestartMasterDuringCreation_Test::TestBody() at ...
    @     0x7f99fc7f5b48  testing::internal::HandleExceptionsInMethodIfSupported<>() at ??:0
    @     0x7f99fc7ea012  testing::Test::Run() at ??:0
    @     0x7f99fc7ea158  testing::TestInfo::Run() at ??:0
    @     0x7f99fc7ea235  testing::TestCase::Run() at ??:0
    @     0x7f99fc7ea518  testing::internal::UnitTestImpl::RunAllTests() at ??:0
    @     0x7f99fc7f6058  testing::internal::HandleExceptionsInMethodIfSupported<>() at ??:0
    @     0x7f99fc7ea7fd  testing::UnitTest::Run() at ??:0
    @     0x7f9a08520cf6  main at ??:0
    @     0x7f99f97a9d5d  __libc_start_main at ??:0
    @           0x43e06d  (unknown) at ??:0

Change-Id: I5bc602a51a4914ec13f926bfca555c816a2ee509
Reviewed-on: http://gerrit.cloudera.org:8080/3465
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c266702b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c266702b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c266702b

Branch: refs/heads/master
Commit: c266702b908bb2a537fd30d8c4bb2e5cf8144cfc
Parents: 630a83b
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jun 22 19:29:19 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jun 24 00:40:44 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 46 +++++++++++++++++------------
 src/kudu/master/catalog_manager.h  | 25 ++++------------
 src/kudu/master/master-test.cc     | 52 +++++++++++++++++++++++++++++----
 src/kudu/master/sys_catalog.cc     |  6 ++--
 src/kudu/master/sys_catalog.h      |  1 -
 5 files changed, 82 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c266702b/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 8aeb78e..0a21c5d 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -533,6 +533,9 @@ CatalogManager::CatalogManager(Master *master)
     state_(kConstructed),
     leader_ready_term_(-1) {
   CHECK_OK(ThreadPoolBuilder("leader-initialization")
+           // Presently, this thread pool must contain only a single thread
+           // (to correctly serialize invocations of ElectedAsLeaderCb upon
+           // closely timed consecutive elections).
            .set_max_threads(1)
            .Build(&worker_pool_));
 }
@@ -574,7 +577,6 @@ Status CatalogManager::Init(bool is_first_run) {
 }
 
 Status CatalogManager::ElectedAsLeaderCb() {
-  std::lock_guard<simple_spinlock> l(state_lock_);
   return worker_pool_->SubmitClosure(
       Bind(&CatalogManager::VisitTablesAndTabletsTask, Unretained(this)));
 }
@@ -595,9 +597,20 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
 }
 
 void CatalogManager::VisitTablesAndTabletsTask() {
-
   Consensus* consensus = sys_catalog_->tablet_peer()->consensus();
   int64_t term = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
+  {
+    std::lock_guard<simple_spinlock> l(state_lock_);
+    if (leader_ready_term_ == term) {
+      // The term hasn't changed since the last time this master was the
+      // leader. It's not possible for another master to be leader for the same
+      // term, so there hasn't been any actual leadership change and thus
+      // there's no reason to reload the on-disk metadata.
+      VLOG(2) << Substitute("Term $0 hasn't changed, ignoring dirty callback",
+                            term);
+      return;
+    }
+  }
   Status s = WaitUntilCaughtUpAsLeader(
       MonoDelta::FromMilliseconds(FLAGS_master_failover_catchup_timeout_ms));
   if (!s.ok()) {
@@ -610,28 +623,25 @@ void CatalogManager::VisitTablesAndTabletsTask() {
     return;
   }
 
-  {
-    std::lock_guard<LockType> lock(lock_);
-    int64_t term_after_wait = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
-    if (term_after_wait != term) {
-      // If we got elected leader again while waiting to catch up then we will
-      // get another callback to visit the tables and tablets, so bail.
-      LOG(INFO) << "Term change from " << term << " to " << term_after_wait
-                << " while waiting for master leader catchup. Not loading sys catalog metadata";
-      return;
-    }
+  int64_t term_after_wait = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
+  if (term_after_wait != term) {
+    // If we got elected leader again while waiting to catch up then we will
+    // get another callback to visit the tables and tablets, so bail.
+    LOG(INFO) << "Term change from " << term << " to " << term_after_wait
+        << " while waiting for master leader catchup. Not loading sys catalog metadata";
+    return;
+  }
 
-    LOG(INFO) << "Loading table and tablet metadata into memory...";
-    LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + "Loading metadata into memory") {
-      CHECK_OK(VisitTablesAndTabletsUnlocked());
-    }
+  LOG(INFO) << "Loading table and tablet metadata into memory...";
+  LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + "Loading metadata into memory") {
+    CHECK_OK(VisitTablesAndTablets());
   }
   std::lock_guard<simple_spinlock> l(state_lock_);
   leader_ready_term_ = term;
 }
 
-Status CatalogManager::VisitTablesAndTabletsUnlocked() {
-  DCHECK(lock_.is_locked());
+Status CatalogManager::VisitTablesAndTablets() {
+  std::lock_guard<LockType> lock(lock_);
 
   // Clear the existing state.
   table_names_map_.clear();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c266702b/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 1cc52f0..75d6d8e 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -392,15 +392,16 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   consensus::RaftPeerPB::Role Role() const;
 
  private:
-  // So that the test can call ElectedAsLeaderCb() directly.
+  // These tests call ElectedAsLeaderCb() directly.
   FRIEND_TEST(MasterTest, TestShutdownDuringTableVisit);
+  FRIEND_TEST(MasterTest, TestGetTableLocationsDuringRepeatedTableVisit);
 
   friend class TableLoader;
   friend class TabletLoader;
 
   // Called by SysCatalog::SysCatalogStateChanged when this node
   // becomes the leader of a consensus configuration. Executes VisitTablesAndTabletsTask
-  // below.
+  // via 'worker_pool_'.
   Status ElectedAsLeaderCb();
 
   // Loops and sleeps until one of the following conditions occurs:
@@ -415,24 +416,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // reading that data, to ensure consistency across failovers.
   Status WaitUntilCaughtUpAsLeader(const MonoDelta& timeout);
 
-  // This method is submitted to 'leader_initialization_pool_' by
-  // ElectedAsLeaderCb above. It:
-  // 1) Acquired 'lock_'
-  // 2) Resets 'tables_tablets_visited_status_'
-  // 3) Runs VisitTablesAndTabletsUnlocked below
-  // 4) Sets 'tables_tablets_visited_status_' to return value of
-  // the call to VisitTablesAndTabletsUnlocked.
-  // 5) Releases 'lock_' and if successful, updates 'leader_ready_term_'
-  // to true (under state_lock_).
+  // Performs several checks before calling VisitTablesAndTablets to actually
+  // reload table/tablet metadata into memory.
   void VisitTablesAndTabletsTask();
 
   // Clears out the existing metadata ('table_names_map_', 'table_ids_map_',
   // and 'tablet_map_'), loads tables metadata into memory and if successful
   // loads the tablets metadata.
-  //
-  // NOTE: Must be called under external synchronization, see
-  // VisitTablesAndTabletsTask() above.
-  Status VisitTablesAndTabletsUnlocked();
+  Status VisitTablesAndTablets();
 
   // Helper for initializing 'sys_catalog_'. After calling this
   // method, the caller should call WaitUntilRunning() on sys_catalog_
@@ -624,10 +615,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
 
   // Used to defer work from reactor threads onto a thread where
   // blocking behavior is permissible.
-  //
-  // NOTE: Presently, this thread pool must contain only a single
-  // thread (to correctly serialize invocations of ElectedAsLeaderCb
-  // upon closely timed consecutive elections).
   gscoped_ptr<ThreadPool> worker_pool_;
 
   // This field is updated when a node becomes leader master,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c266702b/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 810507f..f3414d5 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -582,11 +582,51 @@ TEST_F(MasterTest, TestShutdownDuringTableVisit) {
   // CatalogManager::VisitTablesAndTabletsTask.
 }
 
-static void GetTableSchema(const char* kTableName,
-                           const Schema* kSchema,
-                           MasterServiceProxy* proxy,
-                           CountDownLatch* started,
-                           AtomicBool* done) {
+// Hammers the master with GetTableLocations() calls.
+static void LoopGetTableLocations(const char* kTableName,
+                                  MasterServiceProxy* proxy,
+                                  AtomicBool* done) {
+  GetTableLocationsRequestPB req;
+  GetTableLocationsResponsePB resp;
+  req.mutable_table()->set_table_name(kTableName);
+
+  while (!done->Load()) {
+    RpcController controller;
+    CHECK_OK(proxy->GetTableLocations(req, &resp, &controller));
+  }
+}
+
+// Tests that the catalog manager handles spurious calls to ElectedAsLeaderCb()
+// (i.e. those without a term change) correctly by ignoring them. If they
+// aren't ignored, a concurrent GetTableLocations() call may trigger a
+// use-after-free.
+TEST_F(MasterTest, TestGetTableLocationsDuringRepeatedTableVisit) {
+  const char* kTableName = "test";
+  Schema schema({ ColumnSchema("key", INT32) }, 1);
+  ASSERT_OK(CreateTable(kTableName, schema));
+
+  AtomicBool done(false);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "getTableLocationsThread",
+                           &LoopGetTableLocations, kTableName,
+                           proxy_.get(), &done, &t));
+
+  // Call ElectedAsLeaderCb() repeatedly. If these spurious calls aren't
+  // ignored, the concurrent GetTableLocations() calls may crash the master.
+  for (int i = 0; i < 100; i++) {
+    master_->catalog_manager()->ElectedAsLeaderCb();
+  }
+  done.Store(true);
+  t->Join();
+}
+
+// Hammers the master with GetTableSchema() calls, checking that the results
+// make sense.
+static void LoopGetTableSchema(const char* kTableName,
+                               const Schema* kSchema,
+                               MasterServiceProxy* proxy,
+                               CountDownLatch* started,
+                               AtomicBool* done) {
   GetTableSchemaRequestPB req;
   GetTableSchemaResponsePB resp;
   req.mutable_table()->set_table_name(kTableName);
@@ -634,7 +674,7 @@ TEST_F(MasterTest, TestGetTableSchemaIsAtomicWithCreateTable) {
   // Kick off a thread that calls GetTableSchema() in a loop.
   scoped_refptr<Thread> t;
   ASSERT_OK(Thread::Create("test", "test",
-                           &GetTableSchema, kTableName, &kTableSchema,
+                           &LoopGetTableSchema, kTableName, &kTableSchema,
                            proxy_.get(), &started, &done, &t));
 
   // Only create the table after the thread has started.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c266702b/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 6e7acb1..dba2d77 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -81,8 +81,7 @@ SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics,
                                  ElectedLeaderCallback leader_cb)
     : metric_registry_(metrics),
       master_(master),
-      leader_cb_(std::move(leader_cb)),
-      old_role_(RaftPeerPB::FOLLOWER) {
+      leader_cb_(std::move(leader_cb)) {
   CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
 }
 
@@ -237,8 +236,7 @@ void SysCatalogTable::SysCatalogStateChanged(const string& tablet_id, const stri
                         << "Latest consensus state: " << cstate.ShortDebugString();
   RaftPeerPB::Role new_role = GetConsensusRole(tablet_peer_->permanent_uuid(), cstate);
   LOG_WITH_PREFIX(INFO) << "This master's current role is: "
-                        << RaftPeerPB::Role_Name(new_role)
-                        << ", previous role was: " << RaftPeerPB::Role_Name(old_role_);
+                        << RaftPeerPB::Role_Name(new_role);
   if (new_role == RaftPeerPB::LEADER) {
     Status s = leader_cb_.Run();
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c266702b/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 643c06d..efb2125 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -208,7 +208,6 @@ class SysCatalogTable {
   Master* master_;
 
   ElectedLeaderCallback leader_cb_;
-  consensus::RaftPeerPB::Role old_role_;
 
   consensus::RaftPeerPB local_peer_pb_;
 };


[2/2] incubator-kudu git commit: Allow for reserving disk space for non-Kudu processes

Posted by mp...@apache.org.
Allow for reserving disk space for non-Kudu processes

Adds gflags to reserve disk space such that Kudu will not use more than
specified. Hadoop calls this functionality "du.reserved".

If a WAL preallocation is attempted while the log disk is past its
reservation limit, the log write will fail. As a result, RaftConsensus
will cause the process to crash.

The log block manager will use non-full disks if possible until all of
the disks are full. If a flush or compaction is attempted when all disks
are beyond their configured capacity then the LogBlockManager will
return an error. As a result, the maintenance manager task will cause
the process to crash.

This initial implementation provides a "best effort" approach. Disk
space checks are only done at preallocation time, and if writes continue
beyond the preallocated point (for both a WAL segment and a data block)
those writes will not be prevented. This makes it easier to provide a
"friendly" option where the block manager will divert new writes to
non-full disks, avoiding a hard crash when only one disk is past its
reservation limit.

In the future, we may want to add "hard" and "soft" limits, such that
going beyond the soft limit will do what we do today, and going beyond
the hard limit (say, by writing a very large data block past its
preallocation point) will result in a crash.

This patch includes:

* Unit tests.
* End-to-end test for flushing / compaction falling back to non-full
  disks due to disk space backpressure and finally crashing when there
  is no space left in any data dir.
* End-to-end test for writes failing due to WAL disk space backpressure,
  causing a crash.

Change-Id: Ifd0451d4dbddc1783019a53302de0263080939c7
Reviewed-on: http://gerrit.cloudera.org:8080/3135
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/47c023f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/47c023f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/47c023f5

Branch: refs/heads/master
Commit: 47c023f5eba708a184666e53d4a3000177a32fbc
Parents: c266702
Author: Mike Percy <mp...@apache.org>
Authored: Wed May 18 13:17:01 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jun 24 00:41:00 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test.cc                  |  21 ++
 src/kudu/consensus/log.cc                       |   9 +
 src/kudu/consensus/log_util.cc                  |   1 -
 src/kudu/fs/block_manager-test.cc               |  49 +++-
 src/kudu/fs/log_block_manager.cc                | 275 +++++++++++++++----
 src/kudu/fs/log_block_manager.h                 |  44 ++-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../integration-tests/disk_reservation-itest.cc | 147 ++++++++++
 src/kudu/tablet/tablet_peer_mm_ops.cc           |   7 +-
 src/kudu/util/CMakeLists.txt                    |   1 +
 src/kudu/util/env_util-test.cc                  |  56 ++++
 src/kudu/util/env_util.cc                       |  64 ++++-
 src/kudu/util/env_util.h                        |   7 +
 src/kudu/util/scoped_cleanup.h                  |   1 +
 14 files changed, 610 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index c9fc41b..b05d1e1 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -35,6 +35,8 @@ DEFINE_int32(num_batches, 10000,
 
 DECLARE_int32(log_min_segments_to_retain);
 DECLARE_double(log_inject_io_error_on_preallocate_fraction);
+DECLARE_int64(fs_wal_dir_reserved_bytes);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
 
 namespace kudu {
 namespace log {
@@ -1063,5 +1065,24 @@ TEST_F(LogTest, TestFailedLogPreAllocation) {
   ASSERT_STR_CONTAINS(s.ToString(), "Injected IOError");
 }
 
+// Test the enforcement of reserving disk space for the log.
+TEST_F(LogTest, TestDiskSpaceCheck) {
+  FLAGS_fs_wal_dir_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  options_.segment_size_mb = 1;
+  Status s = BuildLog();
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_EQ(ENOSPC, s.posix_code());
+  ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space");
+
+  FLAGS_disk_reserved_bytes_free_for_testing = 2 * 1024 * 1024;
+  ASSERT_OK(BuildLog());
+
+  // TODO: We don't currently do bookkeeping to ensure that we check if the
+  // disk is past its quota if we write beyond the preallocation limit for a
+  // single segment. If we did that, we could ensure that we check once we
+  // detect that we are past the preallocation limit.
+}
+
 } // namespace log
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index b577339..c7b9852 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -108,6 +108,11 @@ DEFINE_double(log_inject_io_error_on_preallocate_fraction, 0.0,
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
 
+DEFINE_int64(fs_wal_dir_reserved_bytes, 0,
+             "Number of bytes to reserve on the log directory filesystem for non-Kudu usage");
+TAG_FLAG(fs_wal_dir_reserved_bytes, runtime);
+TAG_FLAG(fs_wal_dir_reserved_bytes, evolving);
+
 // Validate that log_min_segments_to_retain >= 1
 static bool ValidateLogsToRetain(const char* flagname, int value) {
   if (value >= 1) {
@@ -856,6 +861,10 @@ Status Log::PreAllocateNewSegment() {
 
   if (options_.preallocate_segments) {
     TRACE("Preallocating $0 byte segment in $1", max_segment_size_, next_segment_path_);
+    RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(fs_manager_->env(),
+                                                      next_segment_path_,
+                                                      max_segment_size_,
+                                                      FLAGS_fs_wal_dir_reserved_bytes));
     // TODO (perf) zero the new segments -- this could result in
     // additional performance improvements.
     RETURN_NOT_OK(next_segment_file_->PreAllocate(max_segment_size_));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 2c29921..80468e1 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -760,7 +760,6 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer)
   return Status::OK();
 }
 
-
 Status WritableLogSegment::WriteEntryBatch(const Slice& data) {
   DCHECK(is_header_written_);
   DCHECK(!is_footer_written_);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index d586025..8674d5a 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -17,7 +17,6 @@
 
 #include <memory>
 
-
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
@@ -51,6 +50,12 @@ DEFINE_int32(num_blocks_close, 500,
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
+DECLARE_int64(fs_data_dirs_reserved_bytes);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
+
+DECLARE_int32(log_block_manager_full_disk_cache_seconds);
+DECLARE_string(block_manager);
+
 // Generic block manager metrics.
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
@@ -65,6 +70,15 @@ METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 METRIC_DECLARE_counter(log_block_manager_containers);
 METRIC_DECLARE_counter(log_block_manager_full_containers);
 
+// The LogBlockManager is only supported on Linux, since it requires hole punching.
+#define RETURN_NOT_LOG_BLOCK_MANAGER() \
+  do { \
+    if (FLAGS_block_manager != "log") { \
+      LOG(INFO) << "This platform does not use the log block manager by default. Skipping test."; \
+      return; \
+    } \
+  } while (false)
+
 namespace kudu {
 namespace fs {
 
@@ -691,15 +705,15 @@ TYPED_TEST(BlockManagerTest, MemTrackerTest) {
   ASSERT_NO_FATAL_FAILURE(this->RunMemTrackerTest());
 }
 
-// The LogBlockManager is only supported on Linux, since it requires hole punching.
-#if defined(__linux__)
-// LogBlockManager-specific tests
+// LogBlockManager-specific tests.
 class LogBlockManagerTest : public BlockManagerTest<LogBlockManager> {
 };
 
 // Regression test for KUDU-1190, a crash at startup when a block ID has been
 // reused.
 TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
   // Set a deterministic random seed, so that we can reproduce the sequence
   // of random numbers.
   bm_->rand_.Reset(1);
@@ -769,6 +783,8 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
 // Note that we rely on filesystem integrity to ensure that we do not lose
 // trailing, fsync()ed metadata.
 TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
   // Create several blocks.
   vector<BlockId> created_blocks;
   BlockId last_block_id;
@@ -931,7 +947,30 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
 }
 
-#endif // defined(__linux__)
+TEST_F(LogBlockManagerTest, TestDiskSpaceCheck) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
+  FLAGS_log_block_manager_full_disk_cache_seconds = 0; // Don't cache device fullness.
+
+  FLAGS_fs_data_dirs_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  FLAGS_log_container_preallocate_bytes = 100;
+
+  vector<BlockId> created_blocks;
+  gscoped_ptr<WritableBlock> writer;
+  Status s = bm_->CreateBlock(&writer);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
+
+  FLAGS_disk_reserved_bytes_free_for_testing = 101;
+  ASSERT_OK(bm_->CreateBlock(&writer));
+
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  s = bm_->CreateBlock(&writer);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+
+  ASSERT_OK(writer->Close());
+}
 
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 6b983c3..33f61a8 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -32,6 +32,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/mutex.h"
@@ -42,6 +43,9 @@
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
+DECLARE_bool(enable_data_block_fsync);
+DECLARE_bool(block_manager_lock_dirs);
+
 // TODO: How should this be configured? Should provide some guidance.
 DEFINE_uint64(log_container_max_size, 10LU * 1024 * 1024 * 1024,
               "Maximum size (soft) of a log container");
@@ -57,8 +61,18 @@ DEFINE_bool(log_block_manager_test_hole_punching, true,
 TAG_FLAG(log_block_manager_test_hole_punching, advanced);
 TAG_FLAG(log_block_manager_test_hole_punching, unsafe);
 
-DECLARE_bool(enable_data_block_fsync);
-DECLARE_bool(block_manager_lock_dirs);
+DEFINE_int32(log_block_manager_full_disk_cache_seconds, 30,
+             "Number of seconds we cache the full-disk status in the block manager. "
+             "During this time, writes to the corresponding root path will not be attempted.");
+TAG_FLAG(log_block_manager_full_disk_cache_seconds, advanced);
+TAG_FLAG(log_block_manager_full_disk_cache_seconds, evolving);
+
+DEFINE_int64(fs_data_dirs_reserved_bytes, 0,
+             "Number of bytes to reserve on each data directory filesystem for non-Kudu usage. "
+             "Only works with the log block manager and when --log_container_preallocate_bytes "
+             "is non-zero.");
+TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
+TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
 
 METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management,
                            "Bytes Under Management",
@@ -80,19 +94,23 @@ METRIC_DEFINE_counter(server, log_block_manager_full_containers,
                       kudu::MetricUnit::kLogBlockContainers,
                       "Number of full log block containers");
 
-using std::unordered_map;
-using std::unordered_set;
-using strings::Substitute;
+METRIC_DEFINE_counter(server, log_block_manager_unavailable_containers,
+                      "Number of Unavailable Log Block Containers",
+                      kudu::MetricUnit::kLogBlockContainers,
+                      "Number of non-full log block containers that are under root paths "
+                      "whose disks are full");
+
 using kudu::env_util::ScopedFileDeleter;
 using kudu::fs::internal::LogBlock;
 using kudu::fs::internal::LogBlockContainer;
 using kudu::pb_util::ReadablePBContainerFile;
 using kudu::pb_util::WritablePBContainerFile;
+using std::unordered_map;
+using std::unordered_set;
+using strings::Substitute;
 
 namespace kudu {
-
 namespace fs {
-
 namespace internal {
 
 ////////////////////////////////////////////////////////////
@@ -114,6 +132,7 @@ struct LogBlockManagerMetrics {
 
   scoped_refptr<Counter> containers;
   scoped_refptr<Counter> full_containers;
+  scoped_refptr<Counter> unavailable_containers;
 };
 
 #define MINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity))
@@ -123,7 +142,8 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
     GINIT(bytes_under_management),
     GINIT(blocks_under_management),
     MINIT(containers),
-    MINIT(full_containers) {
+    MINIT(full_containers),
+    MINIT(unavailable_containers) {
 }
 #undef GINIT
 #undef MINIT
@@ -220,6 +240,12 @@ class LogBlockContainer {
   // beginning from the position where the last written block ended.
   Status Preallocate(size_t length);
 
+  // Returns the path to the metadata file.
+  string MetadataFilePath() const;
+
+  // Returns the path to the data file.
+  string DataFilePath() const;
+
   // Reads the container's metadata from disk, sanity checking and
   // returning the records.
   Status ReadContainerRecords(deque<BlockRecordPB>* records) const;
@@ -243,10 +269,11 @@ class LogBlockContainer {
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t total_bytes_written() const { return total_bytes_written_; }
   bool full() const {
-    return total_bytes_written_ >=  FLAGS_log_container_max_size;
+    return total_bytes_written_ >= FLAGS_log_container_max_size;
   }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   const PathInstanceMetadataPB* instance() const { return instance_; }
+  const std::string& root_path() const { return root_path_; }
 
  private:
   // RAII-style class for finishing containers in FinishBlock().
@@ -263,8 +290,8 @@ class LogBlockContainer {
     LogBlockContainer* container_;
   };
 
-  LogBlockContainer(LogBlockManager* block_manager,
-                    PathInstanceMetadataPB* instance, std::string path,
+  LogBlockContainer(LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
+                    std::string root_path, std::string path,
                     gscoped_ptr<WritablePBContainerFile> metadata_writer,
                     gscoped_ptr<RWFile> data_file);
 
@@ -275,10 +302,17 @@ class LogBlockContainer {
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
 
+  // The path to the container's root path. This is the root directory under
+  // which the container lives.
+  const std::string root_path_;
+
   // The path to the container's files. Equivalent to "<dir>/<id>" (see the
   // container constructor).
   const std::string path_;
 
+  // Offset up to which we have preallocated bytes.
+  int64_t preallocated_offset_ = 0;
+
   // Opened file handles to the container's files.
   //
   // RWFile is not thread safe so access to each writer must be
@@ -290,7 +324,7 @@ class LogBlockContainer {
   gscoped_ptr<RWFile> data_file_;
 
   // The amount of data written thus far in the container.
-  int64_t total_bytes_written_;
+  int64_t total_bytes_written_ = 0;
 
   // The metrics. Not owned by the log container; it has the same lifespan
   // as the block manager.
@@ -303,19 +337,19 @@ class LogBlockContainer {
 
 LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
-    string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
+    string root_path, string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
     gscoped_ptr<RWFile> data_file)
     : block_manager_(block_manager),
+      root_path_(std::move(root_path)),
       path_(std::move(path)),
       metadata_pb_writer_(std::move(metadata_writer)),
       data_file_(std::move(data_file)),
-      total_bytes_written_(0),
       metrics_(block_manager->metrics()),
       instance_(instance) {}
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
                                  PathInstanceMetadataPB* instance,
-                                 const string& dir,
+                                 const string& root_path,
                                  gscoped_ptr<LogBlockContainer>* container) {
   string common_path;
   string metadata_path;
@@ -334,7 +368,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     if (metadata_writer) {
       block_manager->env()->DeleteFile(metadata_path);
     }
-    common_path = JoinPathSegments(dir, block_manager->oid_generator()->Next());
+    common_path = JoinPathSegments(root_path, block_manager->oid_generator()->Next());
     metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
     metadata_status = block_manager->env()->NewRWFile(wr_opts,
                                                       metadata_path,
@@ -356,6 +390,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     RETURN_NOT_OK(metadata_pb_writer->Init(BlockRecordPB()));
     container->reset(new LogBlockContainer(block_manager,
                                            instance,
+                                           root_path,
                                            common_path,
                                            std::move(metadata_pb_writer),
                                            std::move(data_file)));
@@ -368,9 +403,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 
 Status LogBlockContainer::Open(LogBlockManager* block_manager,
                                PathInstanceMetadataPB* instance,
-                               const string& dir, const string& id,
+                               const string& root_path, const string& id,
                                gscoped_ptr<LogBlockContainer>* container) {
-  string common_path = JoinPathSegments(dir, id);
+  string common_path = JoinPathSegments(root_path, id);
 
   // Open the existing metadata and data files for writing.
   string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
@@ -396,6 +431,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   // Create the in-memory container and populate it.
   gscoped_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
                                                                       instance,
+                                                                      root_path,
                                                                       common_path,
                                                                       std::move(metadata_pb_writer),
                                                                       std::move(data_file)));
@@ -404,8 +440,16 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   return Status::OK();
 }
 
+string LogBlockContainer::MetadataFilePath() const {
+  return StrCat(path_, LogBlockManager::kContainerMetadataFileSuffix);
+}
+
+string LogBlockContainer::DataFilePath() const {
+  return StrCat(path_, LogBlockManager::kContainerDataFileSuffix);
+}
+
 Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) const {
-  string metadata_path = StrCat(path_, LogBlockManager::kContainerMetadataFileSuffix);
+  string metadata_path = MetadataFilePath();
   gscoped_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
@@ -527,6 +571,8 @@ Status LogBlockContainer::ReadData(int64_t offset, size_t length,
 }
 
 Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) {
+  // Note: We don't check for sufficient disk space for metadata writes in
+  // order to allow for block deletion on full disks.
   std::lock_guard<Mutex> l(metadata_pb_writer_lock_);
   return metadata_pb_writer_->Append(pb);
 }
@@ -561,7 +607,9 @@ Status LogBlockContainer::SyncMetadata() {
 }
 
 Status LogBlockContainer::Preallocate(size_t length) {
-  return data_file_->PreAllocate(total_bytes_written(), length);
+  RETURN_NOT_OK(data_file_->PreAllocate(total_bytes_written(), length));
+  preallocated_offset_ = total_bytes_written() + length;
+  return Status::OK();
 }
 
 void LogBlockContainer::UpdateBytesWritten(int64_t more_bytes) {
@@ -823,7 +871,7 @@ Status LogWritableBlock::FlushDataAsync() {
     VLOG(3) << "Flushing block " << id();
     RETURN_NOT_OK(container_->FlushData(block_offset_, block_length_));
 
-    RETURN_NOT_OK(AppendMetadata());
+    RETURN_NOT_OK_PREPEND(AppendMetadata(), "Unable to append block metadata");
 
     // TODO: Flush just the range we care about.
     RETURN_NOT_OK(container_->FlushMetadata());
@@ -861,7 +909,7 @@ Status LogWritableBlock::DoClose(SyncMode mode) {
     // FlushDataAsync() was not called; append the metadata now.
     if (state_ == CLEAN || state_ == DIRTY) {
       s = AppendMetadata();
-      RETURN_NOT_OK(s);
+      RETURN_NOT_OK_PREPEND(s, "Unable to flush block during close");
     }
 
     if (mode == SYNC &&
@@ -1180,48 +1228,97 @@ Status LogBlockManager::Open() {
   return Status::OK();
 }
 
-
 Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
                                     gscoped_ptr<WritableBlock>* block) {
   CHECK(!read_only_);
 
+  // Root paths that are below their reserved space threshold. Initialize the
+  // paths from the FullDiskCache. This function-local cache is necessary for
+  // correctness in case the FullDiskCache expiration time is set to 0.
+  unordered_set<string> full_root_paths(root_paths_.size());
+  for (int i = 0; i < root_paths_.size(); i++) {
+    if (full_disk_cache_.IsRootFull(root_paths_[i])) {
+      InsertOrDie(&full_root_paths, root_paths_[i]);
+    }
+  }
+
   // Find a free container. If one cannot be found, create a new one.
+  // In case one or more root paths have hit their reserved space limit, we
+  // retry until we have exhausted all root paths.
   //
   // TODO: should we cap the number of outstanding containers and force
   // callers to block if we've reached it?
-  LogBlockContainer* container = GetAvailableContainer();
-  if (!container) {
-    // Round robin through the root paths to select where the next
-    // container should live.
-    int32 old_idx;
-    int32 new_idx;
-    do {
-      old_idx = root_paths_idx_.Load();
-      new_idx = (old_idx + 1) % root_paths_.size();
-    } while (!root_paths_idx_.CompareAndSet(old_idx, new_idx));
-    string root_path = root_paths_[old_idx];
-
-    // Guaranteed by LogBlockManager::Open().
-    PathInstanceMetadataFile* instance = FindOrDie(instances_by_root_path_, root_path);
-
-    gscoped_ptr<LogBlockContainer> new_container;
-    RETURN_NOT_OK(LogBlockContainer::Create(this,
-                                            instance->metadata(),
-                                            root_path,
-                                            &new_container));
-    container = new_container.release();
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      dirty_dirs_.insert(root_path);
-      AddNewContainerUnlocked(container);
+  LogBlockContainer* container = nullptr;
+  while (!container) {
+    container = GetAvailableContainer(full_root_paths);
+    if (!container) {
+      // If all root paths are full, we cannot allocate a block.
+      if (full_root_paths.size() == root_paths_.size()) {
+        return Status::IOError("Unable to allocate block: All data directories are full. "
+                               "Please free some disk space or consider changing the "
+                               "fs_data_dirs_reserved_bytes configuration parameter",
+                               "", ENOSPC);
+      }
+      // Round robin through the root paths to select where the next
+      // container should live.
+      // TODO: Consider a more random scheme for block placement.
+      int32 cur_idx;
+      int32 next_idx;
+      do {
+        cur_idx = root_paths_idx_.Load();
+        next_idx = (cur_idx + 1) % root_paths_.size();
+      } while (!root_paths_idx_.CompareAndSet(cur_idx, next_idx) ||
+               ContainsKey(full_root_paths, root_paths_[cur_idx]));
+      string root_path = root_paths_[cur_idx];
+      if (full_disk_cache_.IsRootFull(root_path)) {
+        InsertOrDie(&full_root_paths, root_path);
+        continue;
+      }
+
+      // Guaranteed by LogBlockManager::Open().
+      PathInstanceMetadataFile* instance = FindOrDie(instances_by_root_path_, root_path);
+
+      gscoped_ptr<LogBlockContainer> new_container;
+      RETURN_NOT_OK_PREPEND(LogBlockContainer::Create(this,
+                                                      instance->metadata(),
+                                                      root_path,
+                                                      &new_container),
+                            "Could not create new log block container at " + root_path);
+      container = new_container.release();
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        dirty_dirs_.insert(root_path);
+        AddNewContainerUnlocked(container);
+      }
     }
-  }
 
-  // By preallocating with each CreateBlock(), we're effectively
-  // maintaining a rolling buffer of preallocated data just ahead of where
-  // the next write will fall.
-  if (FLAGS_log_container_preallocate_bytes) {
-    RETURN_NOT_OK(container->Preallocate(FLAGS_log_container_preallocate_bytes));
+    // By preallocating with each CreateBlock(), we're effectively
+    // maintaining a rolling buffer of preallocated data just ahead of where
+    // the next write will fall.
+    if (FLAGS_log_container_preallocate_bytes) {
+      // TODO: The use of FLAGS_log_container_preallocate_bytes may be a poor
+      // estimate for the number of bytes we are about to consume for a block.
+      // In the future, we may also want to implement some type of "hard" limit
+      // to ensure that a giant block doesn't blow through the configured
+      // reserved disk space.
+      Status s = env_util::VerifySufficientDiskSpace(env_, container->DataFilePath(),
+                                                     FLAGS_log_container_preallocate_bytes,
+                                                     FLAGS_fs_data_dirs_reserved_bytes);
+      if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
+        LOG(ERROR) << Substitute("Log block manager: Insufficient disk space under path $0: "
+                                 "Creation of new data blocks under this path can be retried after "
+                                 "$1 seconds: $2", container->root_path(),
+                                 FLAGS_log_block_manager_full_disk_cache_seconds, s.ToString());
+        // Blacklist this root globally and locally.
+        full_disk_cache_.MarkRootFull(container->root_path());
+        InsertOrDie(&full_root_paths, container->root_path());
+        MakeContainerAvailable(container);
+        container = nullptr;
+        continue;
+      }
+      RETURN_NOT_OK(s); // Catch other types of IOErrors, etc.
+      RETURN_NOT_OK(container->Preallocate(FLAGS_log_container_preallocate_bytes));
+    }
   }
 
   // Generate a free block ID.
@@ -1277,7 +1374,8 @@ Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
   block_id.CopyToPB(record.mutable_block_id());
   record.set_op_type(DELETE);
   record.set_timestamp_us(GetCurrentTimeMicros());
-  RETURN_NOT_OK(lb->container()->AppendMetadata(record));
+  RETURN_NOT_OK_PREPEND(lb->container()->AppendMetadata(record),
+                        "Unable to append deletion record to block metadata");
 
   // We don't bother fsyncing the metadata append for deletes in order to avoid
   // the disk overhead. Even if we did fsync it, we'd still need to account for
@@ -1321,13 +1419,49 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
   }
 }
 
-LogBlockContainer* LogBlockManager::GetAvailableContainer() {
+LogBlockContainer* LogBlockManager::GetAvailableContainer(
+    const unordered_set<string>& full_root_paths) {
   LogBlockContainer* container = nullptr;
-  std::lock_guard<simple_spinlock> l(lock_);
-  if (!available_containers_.empty()) {
-    container = available_containers_.front();
-    available_containers_.pop_front();
+  int64_t disk_full_containers_delta = 0;
+  MonoTime now = MonoTime::Now(MonoTime::FINE);
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    // Move containers from disk_full -> available.
+    while (!disk_full_containers_.empty() &&
+           disk_full_containers_.top().second.ComesBefore(now)) {
+      available_containers_.push_back(disk_full_containers_.top().first);
+      disk_full_containers_.pop();
+      disk_full_containers_delta -= 1;
+    }
+
+    // Return the first currently-available non-full-disk container (according to
+    // our full-disk cache).
+    while (!container && !available_containers_.empty()) {
+      container = available_containers_.front();
+      available_containers_.pop_front();
+      MonoTime expires;
+      // Note: We must check 'full_disk_cache_' before 'full_root_paths' in
+      // order to correctly use the expiry time provided by 'full_disk_cache_'.
+      if (full_disk_cache_.IsRootFull(container->root_path(), &expires) ||
+          ContainsKey(full_root_paths, container->root_path())) {
+        if (!expires.Initialized()) {
+          // It's no longer in the cache but we still consider it unusable.
+          // It will be moved back into 'available_containers_' on the next call.
+          expires = now;
+        }
+        disk_full_containers_.emplace(container, expires);
+        disk_full_containers_delta += 1;
+        container = nullptr;
+      }
+    }
   }
+
+  // Update the metrics in a batch.
+  if (metrics()) {
+    metrics()->unavailable_containers->IncrementBy(disk_full_containers_delta);
+  }
+
+  // Return the container we found, or null if we don't have anything available.
   return container;
 }
 
@@ -1639,5 +1773,26 @@ std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer*
   return container->ToString();
 }
 
+bool FullDiskCache::IsRootFull(const std::string& root_path, MonoTime* expires_out) const {
+  const MonoTime* expires;
+  {
+    shared_lock<rw_spinlock> l(&lock_.get_lock());
+    expires = FindOrNull(cache_, root_path);
+  }
+  if (expires == nullptr) return false; // No entry exists.
+  if (expires->ComesBefore(MonoTime::Now(MonoTime::FINE))) return false; // Expired.
+  if (expires_out != nullptr) {
+    *expires_out = *expires;
+  }
+  return true; // Root is still full according to the cache.
+}
+
+void FullDiskCache::MarkRootFull(const string& root_path) {
+  MonoTime expires = MonoTime::Now(MonoTime::FINE);
+  expires.AddDelta(MonoDelta::FromSeconds(FLAGS_log_block_manager_full_disk_cache_seconds));
+  std::lock_guard<percpu_rwlock> l(lock_);
+  InsertOrUpdate(&cache_, root_path, expires); // Last one wins.
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 74d529f..01672ec 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -21,6 +21,7 @@
 #include <deque>
 #include <gtest/gtest_prod.h>
 #include <memory>
+#include <queue>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -148,6 +149,22 @@ struct LogBlockManagerMetrics;
 // - Evaluate and implement a solution for data integrity (e.g. per-block
 //   checksum).
 
+// A thread-safe cache that indicates whether a root path is full or not.
+// Includes expiration of the items in the cache. Cache entries are never deleted.
+class FullDiskCache {
+ public:
+  // Returns true if the given 'root_path' has been marked full and the
+  // associated cache entry has not expired.
+  bool IsRootFull(const std::string& root_path, MonoTime* expires_out = nullptr) const;
+
+  // Marks the given 'root_path' as "full".
+  void MarkRootFull(const std::string& root_path);
+
+ private:
+  mutable percpu_rwlock lock_;
+  std::unordered_map<std::string, MonoTime> cache_;
+};
+
 // The log-backed block manager.
 class LogBlockManager : public BlockManager {
  public:
@@ -199,6 +216,15 @@ class LogBlockManager : public BlockManager {
       BlockIdEqual,
       BlockAllocator> BlockMap;
 
+  typedef std::pair<internal::LogBlockContainer*, MonoTime> ExpiringContainerPair;
+
+  class ExpiringContainerPairGreaterThanFunctor {
+   public:
+    bool operator()(const ExpiringContainerPair& a, const ExpiringContainerPair& b) {
+      return b.second.ComesBefore(a.second);
+    }
+  };
+
   // Adds an as of yet unseen container to this block manager.
   void AddNewContainerUnlocked(internal::LogBlockContainer* container);
 
@@ -208,7 +234,11 @@ class LogBlockManager : public BlockManager {
   // After returning, the container is considered to be in use. When
   // writing is finished, call MakeContainerAvailable() to make it
   // available to other writers.
-  internal::LogBlockContainer* GetAvailableContainer();
+  //
+  // 'full_root_paths' is a blacklist containing root paths that are full.
+  // Containers with root paths in this list will not be returned.
+  internal::LogBlockContainer* GetAvailableContainer(
+      const std::unordered_set<std::string>& full_root_paths);
 
   // Indicate that this container is no longer in use and can be handed out
   // to other writers.
@@ -301,6 +331,14 @@ class LogBlockManager : public BlockManager {
   // Does not own the containers.
   std::deque<internal::LogBlockContainer*> available_containers_;
 
+  // Holds only those containers that would be available, were they not on
+  // disks that are past their capacity. This priority queue consists of pairs
+  // of containers and timestamps. Those timestamps represent the next time
+  // that we should check whether the disk is full. The top of the priority
+  // queue is the lowest timestamp.
+  std::priority_queue<ExpiringContainerPair, std::vector<ExpiringContainerPair>,
+                      ExpiringContainerPairGreaterThanFunctor> disk_full_containers_;
+
   // Tracks dirty container directories.
   //
   // Synced and cleared by SyncMetadata().
@@ -327,6 +365,10 @@ class LogBlockManager : public BlockManager {
   typedef std::unordered_map<std::string, ThreadPool*> ThreadPoolMap;
   ThreadPoolMap thread_pools_by_root_path_;
 
+  // A cache of which root paths are full as of the last time they were
+  // checked. This cache expires its entries after some period of time.
+  FullDiskCache full_disk_cache_;
+
   // For generating container names.
   ObjectIdGenerator oid_generator_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 20b573e..9b1d88d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -47,6 +47,7 @@ ADD_KUDU_TEST(client_failover-itest)
 ADD_KUDU_TEST(client-stress-test
   RESOURCE_LOCK "master-rpc-ports"
   RUN_SERIAL true)
+ADD_KUDU_TEST(disk_reservation-itest)
 ADD_KUDU_TEST(master_replication-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/integration-tests/disk_reservation-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_reservation-itest.cc b/src/kudu/integration-tests/disk_reservation-itest.cc
new file mode 100644
index 0000000..f2a2d1c
--- /dev/null
+++ b/src/kudu/integration-tests/disk_reservation-itest.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/util/metrics.h"
+
+using std::string;
+using strings::Substitute;
+
+DECLARE_string(block_manager);
+
+METRIC_DECLARE_entity(server);
+METRIC_DECLARE_counter(log_block_manager_containers);
+METRIC_DECLARE_counter(log_block_manager_unavailable_containers);
+
+namespace kudu {
+
+namespace {
+Status GetTsCounterValue(ExternalTabletServer* ets, MetricPrototype* metric, int64_t* value) {
+  return ets->GetInt64Metric(
+             &METRIC_ENTITY_server,
+             "kudu.tabletserver",
+             metric,
+             "value",
+             value);
+}
+} // namespace
+
+class DiskReservationITest : public ExternalMiniClusterITestBase {
+};
+
+// Test that when we fill up a disk beyond its configured reservation limit, we
+// use other disks for data blocks until all disks are full, at which time we
+// crash. This functionality is only implemented in the log block manager.
+TEST_F(DiskReservationITest, TestFillMultipleDisks) {
+  if (FLAGS_block_manager != "log") {
+    LOG(INFO) << "This platform does not use the log block manager by default. Skipping test.";
+    return;
+  }
+
+  // Set up the tablet so that flushes are constantly occurring.
+  vector<string> ts_flags;
+  ts_flags.push_back("--flush_threshold_mb=0");
+  ts_flags.push_back("--maintenance_manager_polling_interval_ms=100");
+  ts_flags.push_back(Substitute("--fs_data_dirs=$0/a,$0/b",
+                                GetTestDataDirectory()));
+  NO_FATALS(StartCluster(ts_flags, {}, 1));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  // Use a short timeout so that at the end of the test, when we expect a
+  // crash, stopping the workload and joining the client threads is quick.
+  workload.set_timeout_allowed(true);
+  workload.set_write_timeout_millis(100);
+  workload.Setup();
+  workload.Start();
+
+  // Wait until we have 2 active containers.
+  while (true) {
+    int64_t num_containers;
+    ASSERT_OK(GetTsCounterValue(cluster_->tablet_server(0), &METRIC_log_block_manager_containers,
+                                &num_containers));
+    if (num_containers >= 2) break;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  LOG(INFO) << "Two log block containers are active";
+
+  // Simulate that /a has 0 bytes free but /b has 1GB free.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_prefixes_with_bytes_free_for_testing",
+                              Substitute("$0/a:0,$0/b:$1",
+                                         GetTestDataDirectory(),
+                                         1L * 1024 * 1024 * 1024)));
+
+  // Wait until we have 1 unusable container.
+  while (true) {
+    int64_t num_unavailable_containers;
+    ASSERT_OK(GetTsCounterValue(cluster_->tablet_server(0),
+                                &METRIC_log_block_manager_unavailable_containers,
+                                &num_unavailable_containers));
+    if (num_unavailable_containers >= 1) break;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  LOG(INFO) << "Have 1 unavailable log block container";
+
+  // Now simulate that all disks are full.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_prefixes_with_bytes_free_for_testing",
+                              Substitute("$0/a:0,$0/b:0",
+                                         GetTestDataDirectory())));
+
+  // Wait for crash due to inability to flush or compact.
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+  workload.StopAndJoin();
+}
+
+// When the WAL disk goes beyond its configured reservation, attempts to write
+// to the WAL should cause a fatal error.
+TEST_F(DiskReservationITest, TestWalWriteToFullDiskAborts) {
+  vector<string> ts_flags;
+  ts_flags.push_back("--log_segment_size_mb=1"); // Encourage log rolling to speed up the test.
+  NO_FATALS(StartCluster(ts_flags, {}, 1));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.set_timeout_allowed(true); // Allow timeouts because we expect the server to crash.
+  workload.set_write_timeout_millis(100); // Keep test time low after crash.
+  // Write lots of data to quickly fill up our 1mb log segment size.
+  workload.set_num_write_threads(8);
+  workload.set_write_batch_size(1024);
+  workload.set_payload_bytes(128);
+  workload.Setup();
+  workload.Start();
+
+  // Set the disk to "nearly full" which should eventually cause a crash at WAL
+  // preallocation time.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "fs_wal_dir_reserved_bytes", "10000000"));
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_bytes_free_for_testing", "10000001"));
+
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+  workload.StopAndJoin();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
index a1fc4df..3520d1b 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ b/src/kudu/tablet/tablet_peer_mm_ops.cc
@@ -173,9 +173,10 @@ void FlushDeltaMemStoresOp::Perform() {
     LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id();
     return;
   }
-  WARN_NOT_OK(tablet_peer_->tablet()->FlushDMSWithHighestRetention(max_idx_to_segment_size),
-                  Substitute("Failed to flush DMS on $0",
-                             tablet_peer_->tablet()->tablet_id()));
+  KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushDMSWithHighestRetention(
+                            max_idx_to_segment_size),
+                        Substitute("Failed to flush DMS on $0",
+                                   tablet_peer_->tablet()->tablet_id()));
   {
     std::lock_guard<simple_spinlock> l(lock_);
     time_since_flush_.start();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 1558b5d..843fd62 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -268,6 +268,7 @@ ADD_KUDU_TEST(countdown_latch-test)
 ADD_KUDU_TEST(crc-test RUN_SERIAL true) # has a benchmark
 ADD_KUDU_TEST(debug-util-test)
 ADD_KUDU_TEST(env-test LABELS no_tsan)
+ADD_KUDU_TEST(env_util-test)
 ADD_KUDU_TEST(errno-test)
 ADD_KUDU_TEST(failure_detector-test)
 ADD_KUDU_TEST(flag_tags-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/env_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util-test.cc b/src/kudu/util/env_util-test.cc
new file mode 100644
index 0000000..755aaaf
--- /dev/null
+++ b/src/kudu/util/env_util-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/env_util.h"
+
+#include <gflags/gflags.h>
+#include <memory>
+#include <sys/statvfs.h>
+
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(disk_reserved_bytes);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
+
+namespace kudu {
+
+using std::string;
+using std::unique_ptr;
+
+class EnvUtilTest: public KuduTest {
+};
+
+TEST_F(EnvUtilTest, TestDiskSpaceCheck) {
+  Env* env = Env::Default();
+  const string kTestPath = GetTestDataDirectory();
+
+  const int64_t kRequestedBytes = 0;
+  int64_t reserved_bytes = 0;
+  ASSERT_OK(env_util::VerifySufficientDiskSpace(env, kTestPath, kRequestedBytes, reserved_bytes));
+
+  // Make it seem as if the disk is full and specify that we should have
+  // reserved 200 bytes. Even asking for 0 bytes should return an error
+  // indicating we are out of space.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  reserved_bytes = 200;
+  Status s = env_util::VerifySufficientDiskSpace(env, kTestPath, kRequestedBytes, reserved_bytes);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_EQ(ENOSPC, s.posix_code());
+  ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space");
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/env_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index b6b2244..e2bb489 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -16,18 +16,37 @@
 // under the License.
 
 #include <algorithm>
-#include <memory>
-
+#include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <memory>
 #include <string>
+#include <utility>
 
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/debug-util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/status.h"
+#include "kudu/util/flag_tags.h"
+
+DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
+             "For testing only! Set to number of bytes free on each filesystem. "
+             "Set to -1 to disable this test-specific override");
+TAG_FLAG(disk_reserved_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_bytes_free_for_testing, unsafe);
+
+DEFINE_string(disk_reserved_prefixes_with_bytes_free_for_testing, "",
+             "For testing only! Syntax: '/path/a:5,/path/b:7' means a has 5 bytes free, "
+             "b has 7 bytes free. Set to empty string to disable this test-specific override.");
+TAG_FLAG(disk_reserved_prefixes_with_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_prefixes_with_bytes_free_for_testing, unsafe);
 
-using strings::Substitute;
 using std::shared_ptr;
+using strings::Substitute;
 
 namespace kudu {
 namespace env_util {
@@ -62,6 +81,45 @@ Status OpenFileForSequential(Env *env, const string &path,
   return Status::OK();
 }
 
+// If we can parse the flag value, and the flag specifies an override for the
+// given path, then override the free bytes to match what is specified in the
+// flag. See definition of disk_reserved_prefixes_with_bytes_free_for_testing.
+static void OverrideBytesFree(const string& path, const string& flag, int64_t* bytes_free) {
+  for (const auto& str : strings::Split(flag, ",")) {
+    pair<string, string> p = strings::Split(str, ":");
+    if (HasPrefixString(path, p.first)) {
+      int64_t free_override;
+      if (!safe_strto64(p.second.c_str(), p.second.size(), &free_override)) return;
+      *bytes_free = free_override;
+      return;
+    }
+  }
+}
+
+Status VerifySufficientDiskSpace(Env *env, const std::string& path,
+                                 int64_t requested_bytes, int64_t reserved_bytes) {
+  DCHECK_GE(requested_bytes, 0);
+
+  int64_t bytes_free;
+  RETURN_NOT_OK(env->GetBytesFree(path, &bytes_free));
+
+  // Allow overriding these values by tests.
+  if (PREDICT_FALSE(FLAGS_disk_reserved_bytes_free_for_testing > -1)) {
+    bytes_free = FLAGS_disk_reserved_bytes_free_for_testing;
+  }
+  if (PREDICT_FALSE(!FLAGS_disk_reserved_prefixes_with_bytes_free_for_testing.empty())) {
+    OverrideBytesFree(path, FLAGS_disk_reserved_prefixes_with_bytes_free_for_testing, &bytes_free);
+  }
+
+  if (bytes_free - requested_bytes < reserved_bytes) {
+    return Status::IOError(Substitute("Insufficient disk space to allocate $0 bytes under path $1 "
+                                      "($2 bytes free vs $3 bytes reserved)",
+                                      requested_bytes, path, bytes_free, reserved_bytes),
+                           "", ENOSPC);
+  }
+  return Status::OK();
+}
+
 Status ReadFully(RandomAccessFile* file, uint64_t offset, size_t n,
                  Slice* result, uint8_t* scratch) {
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/env_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util.h b/src/kudu/util/env_util.h
index e121f9c..f3f40ae 100644
--- a/src/kudu/util/env_util.h
+++ b/src/kudu/util/env_util.h
@@ -39,6 +39,13 @@ Status OpenFileForRandom(Env *env, const std::string &path,
 Status OpenFileForSequential(Env *env, const std::string &path,
                              std::shared_ptr<SequentialFile> *file);
 
+// Returns Status::IOError with POSIX code ENOSPC if there is not sufficient
+// disk space to write 'bytes' bytes to the file system represented by 'path'.
+// Otherwise returns OK.
+
+Status VerifySufficientDiskSpace(Env *env, const std::string& path,
+                                 int64_t requested_bytes, int64_t reserved_bytes);
+
 // Read exactly 'n' bytes from the given file. If fewer than 'n' bytes
 // are read, returns an IOError. This differs from the underlying
 // RandomAccessFile::Read(), which may return a "short read".

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/scoped_cleanup.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/scoped_cleanup.h b/src/kudu/util/scoped_cleanup.h
index 58aadad..e989331 100644
--- a/src/kudu/util/scoped_cleanup.h
+++ b/src/kudu/util/scoped_cleanup.h
@@ -47,4 +47,5 @@ template<typename F>
 ScopedCleanup<F> MakeScopedCleanup(F f) {
   return ScopedCleanup<F>(f);
 }
+
 } // namespace kudu